refactor: refactor CreateTableProcedure to extract reusable components (#7526)

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-01-07 09:58:53 +08:00
committed by GitHub
parent ada4666e10
commit 77310ec5bd
9 changed files with 379 additions and 264 deletions

View File

@@ -36,8 +36,7 @@ pub mod create_database;
pub mod create_flow;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
pub(crate) use create_table_template::{CreateRequestBuilder, build_template_from_raw_table_info};
pub(crate) use create_table::{CreateRequestBuilder, build_template_from_raw_table_info};
pub mod create_view;
pub mod drop_database;
pub mod drop_flow;

View File

@@ -22,7 +22,7 @@ use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table_template::{
use crate::ddl::create_table::template::{
CreateRequestBuilder, build_template, build_template_from_raw_table_info,
};
use crate::ddl::utils::region_storage_path;

View File

@@ -12,74 +12,99 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod executor;
pub(crate) mod template;
use std::collections::HashMap;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{RegionRequest, RegionRequestHeader};
use api::v1::CreateTableExpr;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_procedure::error::{
ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use futures::future::join_all;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use store_api::storage::{RegionId, RegionNumber};
use store_api::storage::RegionNumber;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use table::table_reference::TableReference;
pub(crate) use template::{CreateRequestBuilder, build_template_from_raw_table_info};
use crate::ddl::create_table_template::{CreateRequestBuilder, build_template};
use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
use crate::ddl::utils::{
add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
extract_column_metadatas, map_to_procedure_error, region_storage_path,
};
use crate::ddl::create_table::executor::CreateTableExecutor;
use crate::ddl::create_table::template::build_template;
use crate::ddl::utils::map_to_procedure_error;
use crate::ddl::{DdlContext, TableMetadata};
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
use crate::key::table_route::PhysicalTableRouteValue;
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{
RegionRoute, find_leader_regions, find_leaders, operating_leader_regions,
};
use crate::rpc::router::{RegionRoute, operating_leader_regions};
pub struct CreateTableProcedure {
pub context: DdlContext,
pub creator: TableCreator,
/// The serializable data.
pub data: CreateTableData,
/// The guards of opening.
pub opening_regions: Vec<OperatingRegionGuard>,
/// The executor of the procedure.
pub executor: CreateTableExecutor,
}
fn build_executor_from_create_table_data(
create_table_expr: &CreateTableExpr,
) -> Result<CreateTableExecutor> {
let template = build_template(create_table_expr)?;
let builder = CreateRequestBuilder::new(template, None);
let table_name = TableName::new(
create_table_expr.catalog_name.clone(),
create_table_expr.schema_name.clone(),
create_table_expr.table_name.clone(),
);
let executor =
CreateTableExecutor::new(table_name, create_table_expr.create_if_not_exists, builder);
Ok(executor)
}
impl CreateTableProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
pub fn new(task: CreateTableTask, context: DdlContext) -> Self {
Self {
pub fn new(task: CreateTableTask, context: DdlContext) -> Result<Self> {
let executor = build_executor_from_create_table_data(&task.create_table)?;
Ok(Self {
context,
creator: TableCreator::new(task),
}
data: CreateTableData::new(task),
opening_regions: vec![],
executor,
})
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
let data: CreateTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let create_table_expr = &data.task.create_table;
let executor = build_executor_from_create_table_data(create_table_expr)
.map_err(BoxedError::new)
.context(ExternalSnafu {
clean_poisons: false,
})?;
Ok(CreateTableProcedure {
context,
creator: TableCreator {
data,
opening_regions: vec![],
},
data,
opening_regions: vec![],
executor,
})
}
fn table_info(&self) -> &RawTableInfo {
&self.creator.data.task.table_info
&self.data.task.table_info
}
pub(crate) fn table_id(&self) -> TableId {
@@ -87,8 +112,7 @@ impl CreateTableProcedure {
}
fn region_wal_options(&self) -> Result<&HashMap<RegionNumber, String>> {
self.creator
.data
self.data
.region_wal_options
.as_ref()
.context(error::UnexpectedSnafu {
@@ -97,8 +121,7 @@ impl CreateTableProcedure {
}
fn table_route(&self) -> Result<&PhysicalTableRouteValue> {
self.creator
.data
self.data
.table_route
.as_ref()
.context(error::UnexpectedSnafu {
@@ -106,17 +129,6 @@ impl CreateTableProcedure {
})
}
#[cfg(any(test, feature = "testing"))]
pub fn set_allocated_metadata(
&mut self,
table_id: TableId,
table_route: PhysicalTableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) {
self.creator
.set_allocated_metadata(table_id, table_route, region_wal_options)
}
/// On the prepare step, it performs:
/// - Checks whether the table exists.
/// - Allocates the table id.
@@ -125,31 +137,16 @@ impl CreateTableProcedure {
/// - TableName exists and `create_if_not_exists` is false.
/// - Failed to allocate [TableMetadata].
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let expr = &self.creator.data.task.create_table;
let table_name_value = self
.context
.table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(
&expr.catalog_name,
&expr.schema_name,
&expr.table_name,
))
let table_id = self
.executor
.on_prepare(&self.context.table_metadata_manager)
.await?;
if let Some(value) = table_name_value {
ensure!(
expr.create_if_not_exists,
error::TableAlreadyExistsSnafu {
table_name: self.creator.data.table_ref().to_string(),
}
);
let table_id = value.table_id();
// Return the table id if the table already exists.
if let Some(table_id) = table_id {
return Ok(Status::done_with_output(table_id));
}
self.creator.data.state = CreateTableState::DatanodeCreateRegions;
self.data.state = CreateTableState::DatanodeCreateRegions;
let TableMetadata {
table_id,
table_route,
@@ -157,23 +154,13 @@ impl CreateTableProcedure {
} = self
.context
.table_metadata_allocator
.create(&self.creator.data.task)
.create(&self.data.task)
.await?;
self.creator
.set_allocated_metadata(table_id, table_route, region_wal_options);
self.set_allocated_metadata(table_id, table_route, region_wal_options);
Ok(Status::executing(true))
}
pub fn new_region_request_builder(
&self,
physical_table_id: Option<TableId>,
) -> Result<CreateRequestBuilder> {
let create_table_expr = &self.creator.data.task.create_table;
let template = build_template(create_table_expr)?;
Ok(CreateRequestBuilder::new(template, physical_table_id))
}
/// Creates regions on datanodes
///
/// Abort(non-retry):
@@ -187,90 +174,29 @@ impl CreateTableProcedure {
/// - [Code::Unavailable](tonic::status::Code::Unavailable)
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
let table_route = self.table_route()?.clone();
let request_builder = self.new_region_request_builder(None)?;
// Registers opening regions
let guards = self
.creator
.register_opening_regions(&self.context, &table_route.region_routes)?;
let guards = self.register_opening_regions(&self.context, &table_route.region_routes)?;
if !guards.is_empty() {
self.creator.opening_regions = guards;
self.opening_regions = guards;
}
self.create_regions(&table_route.region_routes, request_builder)
.await
self.create_regions(&table_route.region_routes).await
}
async fn create_regions(
&mut self,
region_routes: &[RegionRoute],
request_builder: CreateRequestBuilder,
) -> Result<Status> {
let create_table_data = &self.creator.data;
// Safety: the region_wal_options must be allocated
async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result<Status> {
let table_id = self.table_id();
let region_wal_options = self.region_wal_options()?;
let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
let storage_path = region_storage_path(catalog, schema);
let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
let column_metadatas = self
.executor
.on_create_regions(
&self.context.node_manager,
table_id,
region_routes,
region_wal_options,
)
.await?;
let partition_exprs = region_routes
.iter()
.map(|r| (r.region.id.region_number(), r.region.partition_expr()))
.collect();
for datanode in leaders {
let requester = self.context.node_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let mut requests = Vec::with_capacity(regions.len());
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);
let create_region_request = request_builder.build_one(
region_id,
storage_path.clone(),
region_wal_options,
&partition_exprs,
);
requests.push(PbRegionRequest::Create(create_region_request));
}
for request in requests {
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(request),
};
let datanode = datanode.clone();
let requester = requester.clone();
create_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
}
let mut results = join_all(create_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
if let Some(column_metadatas) =
extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
{
self.creator.data.column_metadatas = column_metadatas;
} else {
warn!(
"creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
);
}
self.creator.data.state = CreateTableState::CreateMetadata;
self.data.column_metadatas = column_metadatas;
self.data.state = CreateTableState::CreateMetadata;
Ok(Status::executing(true))
}
@@ -280,107 +206,33 @@ impl CreateTableProcedure {
/// - Failed to create table metadata.
async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result<Status> {
let table_id = self.table_id();
let table_ref = self.creator.data.table_ref();
let table_ref = self.data.table_ref();
let manager = &self.context.table_metadata_manager;
let mut raw_table_info = self.table_info().clone();
if !self.creator.data.column_metadatas.is_empty() {
update_table_info_column_ids(&mut raw_table_info, &self.creator.data.column_metadatas);
}
let raw_table_info = self.table_info().clone();
// Safety: the region_wal_options must be allocated.
let region_wal_options = self.region_wal_options()?.clone();
// Safety: the table_route must be allocated.
let physical_table_route = self.table_route()?.clone();
let detecting_regions =
convert_region_routes_to_detecting_regions(&physical_table_route.region_routes);
let table_route = TableRouteValue::Physical(physical_table_route);
manager
.create_table_metadata(raw_table_info, table_route, region_wal_options)
self.executor
.on_create_metadata(
manager,
&self.context.region_failure_detector_controller,
raw_table_info,
&self.data.column_metadatas,
physical_table_route,
region_wal_options,
)
.await?;
self.context
.register_failure_detectors(detecting_regions)
.await;
info!(
"Successfully created table: {}, table_id: {}, procedure_id: {}",
table_ref, table_id, pid
);
self.creator.opening_regions.clear();
self.opening_regions.clear();
Ok(Status::done_with_output(table_id))
}
}
#[async_trait]
impl Procedure for CreateTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
fn recover(&mut self) -> ProcedureResult<()> {
// Only registers regions if the table route is allocated.
if let Some(x) = &self.creator.data.table_route {
self.creator.opening_regions = self
.creator
.register_opening_regions(&self.context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu {
clean_poisons: false,
})?;
}
Ok(())
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.creator.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
.with_label_values(&[state.as_ref()])
.start_timer();
match state {
CreateTableState::Prepare => self.on_prepare().await,
CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
}
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let table_ref = &self.creator.data.table_ref();
LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
])
}
}
pub struct TableCreator {
/// The serializable data.
pub data: CreateTableData,
/// The guards of opening.
pub opening_regions: Vec<OperatingRegionGuard>,
}
impl TableCreator {
pub fn new(task: CreateTableTask) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,
column_metadatas: vec![],
task,
table_route: None,
region_wal_options: None,
},
opening_regions: vec![],
}
}
/// Registers and returns the guards of the opening region if they don't exist.
fn register_opening_regions(
@@ -389,7 +241,6 @@ impl TableCreator {
region_routes: &[RegionRoute],
) -> Result<Vec<OperatingRegionGuard>> {
let opening_regions = operating_leader_regions(region_routes);
if self.opening_regions.len() == opening_regions.len() {
return Ok(vec![]);
}
@@ -409,7 +260,7 @@ impl TableCreator {
Ok(opening_region_guards)
}
fn set_allocated_metadata(
pub fn set_allocated_metadata(
&mut self,
table_id: TableId,
table_route: PhysicalTableRouteValue,
@@ -421,6 +272,56 @@ impl TableCreator {
}
}
#[async_trait]
impl Procedure for CreateTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
fn recover(&mut self) -> ProcedureResult<()> {
// Only registers regions if the table route is allocated.
if let Some(x) = &self.data.table_route {
self.opening_regions = self
.register_opening_regions(&self.context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu {
clean_poisons: false,
})?;
}
Ok(())
}
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
.with_label_values(&[state.as_ref()])
.start_timer();
match state {
CreateTableState::Prepare => self.on_prepare().await,
CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
}
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let table_ref = &self.data.table_ref();
LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
])
}
}
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
pub enum CreateTableState {
/// Prepares to create the table
@@ -444,6 +345,16 @@ pub struct CreateTableData {
}
impl CreateTableData {
pub fn new(task: CreateTableTask) -> Self {
CreateTableData {
state: CreateTableState::Prepare,
column_metadatas: vec![],
task,
table_route: None,
region_wal_options: None,
}
}
fn table_ref(&self) -> TableReference<'_> {
self.task.table_ref()
}

View File

@@ -0,0 +1,203 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{RegionRequest, RegionRequestHeader};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::warn;
use futures::future::join_all;
use snafu::ensure;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use crate::ddl::utils::raw_table_info::update_table_info_column_ids;
use crate::ddl::utils::{
add_peer_context_if_needed, convert_region_routes_to_detecting_regions,
extract_column_metadatas, region_storage_path,
};
use crate::ddl::{CreateRequestBuilder, RegionFailureDetectorControllerRef};
use crate::error::{self, Result};
use crate::key::TableMetadataManagerRef;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
use crate::node_manager::NodeManagerRef;
use crate::rpc::router::{RegionRoute, find_leader_regions, find_leaders};
/// [CreateTableExecutor] performs:
/// - Creates the metadata of the table.
/// - Creates the regions on the Datanode nodes.
pub struct CreateTableExecutor {
create_if_not_exists: bool,
table_name: TableName,
builder: CreateRequestBuilder,
}
impl CreateTableExecutor {
/// Creates a new [`CreateTableExecutor`].
pub fn new(
table_name: TableName,
create_if_not_exists: bool,
builder: CreateRequestBuilder,
) -> Self {
Self {
create_if_not_exists,
table_name,
builder,
}
}
/// On the prepare step, it performs:
/// - Checks whether the table exists.
/// - Returns the table id if the table exists.
///
/// Abort(non-retry):
/// - Table exists and `create_if_not_exists` is `false`.
/// - Failed to get the table name value.
pub async fn on_prepare(
&self,
table_metadata_manager: &TableMetadataManagerRef,
) -> Result<Option<TableId>> {
let table_name_value = table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(
&self.table_name.catalog_name,
&self.table_name.schema_name,
&self.table_name.table_name,
))
.await?;
if let Some(value) = table_name_value {
ensure!(
self.create_if_not_exists,
error::TableAlreadyExistsSnafu {
table_name: self.table_name.to_string(),
}
);
return Ok(Some(value.table_id()));
}
Ok(None)
}
pub async fn on_create_regions(
&self,
node_manager: &NodeManagerRef,
table_id: TableId,
region_routes: &[RegionRoute],
region_wal_options: &HashMap<RegionNumber, String>,
) -> Result<Vec<ColumnMetadata>> {
let storage_path =
region_storage_path(&self.table_name.catalog_name, &self.table_name.schema_name);
let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
let partition_exprs = region_routes
.iter()
.map(|r| (r.region.id.region_number(), r.region.partition_expr()))
.collect::<HashMap<_, _>>();
for datanode in leaders {
let requester = node_manager.datanode(&datanode).await;
let regions = find_leader_regions(region_routes, &datanode);
let mut requests = Vec::with_capacity(regions.len());
for region_number in regions {
let region_id = RegionId::new(table_id, region_number);
let create_region_request = self.builder.build_one(
region_id,
storage_path.clone(),
region_wal_options,
&partition_exprs,
);
requests.push(PbRegionRequest::Create(create_region_request));
}
for request in requests {
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(request),
};
let datanode = datanode.clone();
let requester = requester.clone();
create_region_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
}
let mut results = join_all(create_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
let column_metadatas = if let Some(column_metadatas) =
extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
{
column_metadatas
} else {
warn!(
"creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"
);
vec![]
};
Ok(column_metadatas)
}
/// Creates table metadata
///
/// Abort(non-retry):
/// - Failed to create table metadata.
pub async fn on_create_metadata(
&self,
table_metadata_manager: &TableMetadataManagerRef,
region_failure_detector_controller: &RegionFailureDetectorControllerRef,
mut raw_table_info: RawTableInfo,
column_metadatas: &[ColumnMetadata],
table_route: PhysicalTableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Result<()> {
if !column_metadatas.is_empty() {
update_table_info_column_ids(&mut raw_table_info, column_metadatas);
}
let detecting_regions =
convert_region_routes_to_detecting_regions(&table_route.region_routes);
let table_route = TableRouteValue::Physical(table_route);
table_metadata_manager
.create_table_metadata(raw_table_info, table_route, region_wal_options)
.await?;
region_failure_detector_controller
.register_failure_detectors(detecting_regions)
.await;
Ok(())
}
/// Returns the builder of the executor.
pub fn builder(&self) -> &CreateRequestBuilder {
&self.builder
}
}

View File

@@ -162,7 +162,7 @@ async fn test_on_prepare_table_exists_err() {
)
.await
.unwrap();
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, Error::TableAlreadyExists { .. });
assert_eq!(err.status_code(), StatusCode::TableAlreadyExists);
@@ -185,7 +185,7 @@ async fn test_on_prepare_with_create_if_table_exists() {
)
.await
.unwrap();
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Done { output: Some(..) });
let table_id = *status.downcast_output_ref::<u32>().unwrap();
@@ -198,7 +198,7 @@ async fn test_on_prepare_without_create_if_table_exists() {
let ddl_context = new_ddl_context(node_manager);
let mut task = test_create_table_task("foo");
task.create_table.create_if_not_exists = true;
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
let status = procedure.on_prepare().await.unwrap();
assert_matches!(
status,
@@ -217,7 +217,7 @@ async fn test_on_datanode_create_regions_should_retry() {
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
@@ -234,7 +234,7 @@ async fn test_on_datanode_create_regions_should_not_retry() {
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let mut procedure = CreateTableProcedure::new(task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
@@ -251,7 +251,7 @@ async fn test_on_create_metadata_error() {
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(task.clone(), ddl_context.clone());
let mut procedure = CreateTableProcedure::new(task.clone(), ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
@@ -284,7 +284,7 @@ async fn test_on_create_metadata() {
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("foo");
assert!(!task.create_table.create_if_not_exists);
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone());
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
@@ -312,16 +312,16 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend);
let task = test_create_table_task("foo");
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone());
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()).unwrap();
execute_procedure_until(&mut procedure, |p| {
p.creator.data.state == CreateTableState::CreateMetadata
p.data.state == CreateTableState::CreateMetadata
})
.await;
// Ensure that after running to the state `CreateMetadata`(just past `DatanodeCreateRegions`),
// the opening regions should be recorded:
let guards = &procedure.creator.opening_regions;
let guards = &procedure.opening_regions;
assert_eq!(guards.len(), 1);
let (datanode_id, region_id) = (0, RegionId::new(procedure.table_id(), 0));
assert_eq!(guards[0].info(), (datanode_id, region_id));
@@ -334,7 +334,7 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
execute_procedure_until_done(&mut procedure).await;
// Ensure that when run to the end, the opening regions should be cleared:
let guards = &procedure.creator.opening_regions;
let guards = &procedure.opening_regions;
assert!(guards.is_empty());
assert!(
!ddl_context

View File

@@ -259,7 +259,7 @@ async fn test_replace_table() {
{
// Create a `foo` table.
let task = test_create_table_task("foo");
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone());
let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),

View File

@@ -231,7 +231,7 @@ impl DdlManager {
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = CreateTableProcedure::new(create_table_task, context);
let procedure = CreateTableProcedure::new(create_table_task, context)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

View File

@@ -92,7 +92,8 @@ fn test_region_request_builder() {
let mut procedure = CreateTableProcedure::new(
create_table_task(None),
test_data::new_ddl_context(Arc::new(NodeClients::default())),
);
)
.unwrap();
procedure.set_allocated_metadata(
1024,
@@ -100,7 +101,7 @@ fn test_region_request_builder() {
HashMap::default(),
);
let template = procedure.new_region_request_builder(None).unwrap();
let template = procedure.executor.builder();
let expected = PbCreateRegionRequest {
region_id: 0,
@@ -187,7 +188,8 @@ async fn test_on_datanode_create_regions() {
let mut procedure = CreateTableProcedure::new(
create_table_task(None),
test_data::new_ddl_context(node_manager),
);
)
.unwrap();
procedure.set_allocated_metadata(
42,
@@ -226,7 +228,7 @@ async fn test_on_datanode_create_regions() {
}
));
assert!(matches!(
procedure.creator.data.state,
procedure.data.state,
CreateTableState::CreateMetadata
));