refactor: refactor ddl procedure (#2304)

This commit is contained in:
Weny Xu
2023-09-02 13:04:18 +08:00
committed by Ruihang Xia
parent 4818887e38
commit fa5e3b94d3
16 changed files with 458 additions and 469 deletions

2
Cargo.lock generated
View File

@@ -1833,6 +1833,7 @@ dependencies = [
"chrono",
"common-catalog",
"common-error",
"common-procedure",
"common-runtime",
"common-telemetry",
"common-time",
@@ -1848,6 +1849,7 @@ dependencies = [
"serde_json",
"snafu",
"store-api",
"strum 0.25.0",
"table",
"tokio",
]

View File

@@ -10,9 +10,11 @@ async-stream.workspace = true
async-trait.workspace = true
common-catalog = { workspace = true }
common-error = { workspace = true }
common-procedure = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }
common-time = { workspace = true }
datatypes = { workspace = true }
etcd-client.workspace = true
futures.workspace = true
humantime-serde.workspace = true
@@ -23,6 +25,7 @@ serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api = { workspace = true }
strum.workspace = true
table = { workspace = true }
tokio.workspace = true

View File

@@ -0,0 +1,29 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::key::TableMetadataManagerRef;
pub mod alter_table;
pub mod create_table;
pub mod drop_table;
pub mod utils;
#[derive(Clone)]
pub struct DdlContext {
pub datanode_manager: DatanodeManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
pub table_metadata_manager: TableMetadataManagerRef,
}

View File

@@ -15,30 +15,27 @@
use std::vec;
use async_trait::async_trait;
use client::Database;
use common_meta::cache_invalidator::Context;
use common_meta::ident::TableIdent;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteValue;
use common_meta::rpc::ddl::AlterTableTask;
use common_meta::rpc::router::{find_leaders, RegionRoute};
use common_meta::table_name::TableName;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status,
};
use common_telemetry::{debug, info};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::requests::{AlterKind, AlterTableRequest};
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::error::{self, Result, TableMetadataManagerSnafu};
use crate::procedure::utils::handle_request_datanode_error;
use crate::error::{self, Result};
use crate::ident::TableIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::RegionRoute;
use crate::table_name::TableName;
// TODO(weny): removes in following PRs.
#[allow(dead_code)]
@@ -50,9 +47,9 @@ pub struct AlterTableProcedure {
// TODO(weny): removes in following PRs.
#[allow(dead_code)]
impl AlterTableProcedure {
pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
pub(crate) fn new(
pub fn new(
cluster_id: u64,
task: AlterTableTask,
alter_table_request: AlterTableRequest,
@@ -65,7 +62,7 @@ impl AlterTableProcedure {
}
}
pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(AlterTableProcedure { context, data })
@@ -85,8 +82,7 @@ impl AlterTableProcedure {
&request.schema_name,
new_table_name,
))
.await
.context(TableMetadataManagerSnafu)?;
.await?;
ensure!(
!exist,
@@ -107,13 +103,12 @@ impl AlterTableProcedure {
&request.schema_name,
&request.table_name,
))
.await
.context(TableMetadataManagerSnafu)?;
.await?;
ensure!(
exist,
error::TableNotFoundSnafu {
name: request.table_ref().to_string()
table_name: request.table_ref().to_string()
}
);
@@ -122,44 +117,6 @@ impl AlterTableProcedure {
Ok(Status::executing(true))
}
/// Alters table on datanode.
async fn on_datanode_alter_table(&mut self) -> Result<Status> {
let region_routes = self
.data
.region_routes
.as_ref()
.context(error::UnexpectedSnafu {
violated: "expected table_route",
})?;
let table_ref = self.data.table_ref();
let clients = self.context.datanode_clients.clone();
let leaders = find_leaders(region_routes);
let mut joins = Vec::with_capacity(leaders.len());
for datanode in leaders {
let client = clients.get_client(&datanode).await;
let client = Database::new(table_ref.catalog, table_ref.schema, client);
let expr = self.data.task.alter_table.clone();
joins.push(common_runtime::spawn_bg(async move {
debug!("Sending {:?} to {:?}", expr, client);
client
.alter(expr)
.await
.map_err(handle_request_datanode_error(datanode))
}));
}
let _ = join_all(joins)
.await
.into_iter()
.map(|e| e.context(error::JoinSnafu).flatten())
.collect::<Result<Vec<_>>>()?;
Ok(Status::Done)
}
/// Update table metadata for rename table operation.
async fn on_update_metadata_for_rename(&self, new_table_name: String) -> Result<()> {
let table_metadata_manager = &self.context.table_metadata_manager;
@@ -168,8 +125,7 @@ impl AlterTableProcedure {
table_metadata_manager
.rename_table(current_table_info_value, new_table_name)
.await
.context(error::TableMetadataManagerSnafu)?;
.await?;
Ok(())
}
@@ -180,8 +136,7 @@ impl AlterTableProcedure {
table_metadata_manager
.update_table_info(current_table_info_value, new_table_info)
.await
.context(error::TableMetadataManagerSnafu)?;
.await?;
Ok(())
}
@@ -241,8 +196,7 @@ impl AlterTableProcedure {
let TableRouteValue { region_routes, .. } = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)?
.await?
.with_context(|| error::TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
@@ -272,8 +226,7 @@ impl AlterTableProcedure {
},
table_ident,
)
.await
.context(error::InvalidateTableCacheSnafu)?;
.await?;
self.data.state = AlterTableState::DatanodeAlterTable;
Ok(Status::executing(true))
@@ -320,7 +273,7 @@ impl Procedure for AlterTableProcedure {
AlterTableState::Prepare => self.on_prepare().await,
AlterTableState::UpdateMetadata => self.on_update_metadata().await,
AlterTableState::InvalidateTableCache => self.on_broadcast().await,
AlterTableState::DatanodeAlterTable => self.on_datanode_alter_table().await,
AlterTableState::DatanodeAlterTable => todo!(),
}
.map_err(error_handler)
}

View File

@@ -16,9 +16,6 @@ use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{ColumnDef, CreateRequest as PbCreateRegionRequest};
use api::v1::SemanticType;
use async_trait::async_trait;
use common_meta::key::table_name::TableNameKey;
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::info;
@@ -30,20 +27,24 @@ use strum::AsRefStr;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use crate::ddl::utils::{handle_operate_region_error, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error::{self, PrimaryKeyNotFoundSnafu, Result, TableMetadataManagerSnafu};
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
use crate::metrics;
use crate::procedure::utils::{handle_operate_region_error, handle_retry_error};
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
pub struct CreateTableProcedure {
context: DdlContext,
creator: TableCreator,
pub context: DdlContext,
pub creator: TableCreator,
}
#[allow(dead_code)]
impl CreateTableProcedure {
pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
pub(crate) fn new(
pub fn new(
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
@@ -55,7 +56,7 @@ impl CreateTableProcedure {
}
}
pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(CreateTableProcedure {
context,
@@ -87,8 +88,7 @@ impl CreateTableProcedure {
&expr.schema_name,
&expr.table_name,
))
.await
.context(TableMetadataManagerSnafu)?;
.await?;
if exist {
ensure!(
@@ -106,7 +106,7 @@ impl CreateTableProcedure {
Ok(Status::executing(true))
}
fn create_region_request_template(&self) -> Result<PbCreateRegionRequest> {
pub fn create_region_request_template(&self) -> Result<PbCreateRegionRequest> {
let create_table_expr = &self.creator.data.task.create_table;
let column_defs = create_table_expr
@@ -146,7 +146,7 @@ impl CreateTableProcedure {
None
}
})
.context(PrimaryKeyNotFoundSnafu { key })
.context(error::PrimaryKeyNotFoundSnafu { key })
})
.collect::<Result<_>>()?;
@@ -162,7 +162,7 @@ impl CreateTableProcedure {
})
}
async fn on_datanode_create_regions(&mut self) -> Result<Status> {
pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
let create_table_data = &self.creator.data;
let region_routes = &create_table_data.region_routes;
@@ -223,8 +223,7 @@ impl CreateTableProcedure {
let region_routes = self.region_routes().clone();
manager
.create_table_metadata(raw_table_info, region_routes)
.await
.context(TableMetadataManagerSnafu)?;
.await?;
info!("Created table metadata for table {table_id}");
Ok(Status::Done)
@@ -270,7 +269,7 @@ impl Procedure for CreateTableProcedure {
}
pub struct TableCreator {
data: CreateTableData,
pub data: CreateTableData,
}
impl TableCreator {
@@ -287,7 +286,7 @@ impl TableCreator {
}
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)]
enum CreateTableState {
pub enum CreateTableState {
/// Prepares to create the table
Prepare,
/// Creates regions on the Datanode
@@ -298,10 +297,10 @@ enum CreateTableState {
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateTableData {
state: CreateTableState,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
cluster_id: u64,
pub state: CreateTableState,
pub task: CreateTableTask,
pub region_routes: Vec<RegionRoute>,
pub cluster_id: u64,
}
impl CreateTableData {
@@ -309,167 +308,3 @@ impl CreateTableData {
self.task.table_ref()
}
}
#[cfg(test)]
mod test {
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, CreateTableExpr};
use common_catalog::consts::MITO2_ENGINE;
use super::*;
use crate::procedure::utils::mock::EchoRegionServer;
use crate::procedure::utils::test_data;
fn create_table_procedure() -> CreateTableProcedure {
let create_table_expr = CreateTableExpr {
catalog_name: "my_catalog".to_string(),
schema_name: "my_schema".to_string(),
table_name: "my_table".to_string(),
desc: "blabla".to_string(),
column_defs: vec![
PbColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
},
PbColumnDef {
name: "my_tag1".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
},
PbColumnDef {
name: "my_tag2".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
},
PbColumnDef {
name: "my_field_column".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
},
],
time_index: "ts".to_string(),
primary_keys: vec!["my_tag2".to_string(), "my_tag1".to_string()],
create_if_not_exists: false,
table_options: HashMap::new(),
table_id: None,
region_numbers: vec![1, 2, 3],
engine: MITO2_ENGINE.to_string(),
};
CreateTableProcedure::new(
1,
CreateTableTask::new(create_table_expr, vec![], test_data::new_table_info()),
test_data::new_region_routes(),
test_data::new_ddl_context(),
)
}
#[test]
fn test_create_region_request_template() {
let procedure = create_table_procedure();
let template = procedure.create_region_request_template().unwrap();
let expected = PbCreateRegionRequest {
region_id: 0,
engine: MITO2_ENGINE.to_string(),
column_defs: vec![
ColumnDef {
name: "ts".to_string(),
column_id: 0,
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Timestamp as i32,
},
ColumnDef {
name: "my_tag1".to_string(),
column_id: 1,
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Tag as i32,
},
ColumnDef {
name: "my_tag2".to_string(),
column_id: 2,
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Tag as i32,
},
ColumnDef {
name: "my_field_column".to_string(),
column_id: 3,
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
},
],
primary_key: vec![2, 1],
create_if_not_exists: true,
catalog: String::new(),
schema: String::new(),
options: HashMap::new(),
};
assert_eq!(template, expected);
}
#[tokio::test]
async fn test_on_datanode_create_regions() {
let mut procedure = create_table_procedure();
let (region_server, mut rx) = EchoRegionServer::new();
let datanodes = find_leaders(&procedure.creator.data.region_routes);
for peer in datanodes {
let client = region_server.new_client(&peer);
procedure
.context
.datanode_clients
.insert_client(peer, client)
.await;
}
let expected_created_regions = Arc::new(Mutex::new(HashSet::from([
RegionId::new(42, 1),
RegionId::new(42, 2),
RegionId::new(42, 3),
])));
let handle = tokio::spawn({
let expected_created_regions = expected_created_regions.clone();
let mut max_recv = expected_created_regions.lock().unwrap().len();
async move {
while let Some(PbRegionRequest::Create(request)) = rx.recv().await {
let region_id = RegionId::from_u64(request.region_id);
expected_created_regions.lock().unwrap().remove(&region_id);
max_recv -= 1;
if max_recv == 0 {
break;
}
}
}
});
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(status, Status::Executing { persist: true }));
assert!(matches!(
procedure.creator.data.state,
CreateTableState::CreateMetadata
));
handle.await.unwrap();
assert!(expected_created_regions.lock().unwrap().is_empty());
}
}

View File

@@ -14,17 +14,8 @@
use api::v1::region::{region_request, DropRequest as PbDropRegionRequest};
use async_trait::async_trait;
use client::region::RegionRequester;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_meta::cache_invalidator::Context;
use common_meta::ident::TableIdent;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteValue;
use common_meta::rpc::ddl::DropTableTask;
use common_meta::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use common_meta::table_name::TableName;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
@@ -39,20 +30,29 @@ use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use super::utils::handle_retry_error;
use crate::cache_invalidator::Context;
use crate::ddl::utils::handle_operate_region_error;
use crate::ddl::DdlContext;
use crate::error::{self, Result, TableMetadataManagerSnafu};
use crate::error::{self, Result};
use crate::ident::TableIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::metrics;
use crate::procedure::utils::handle_request_datanode_error;
use crate::rpc::ddl::DropTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::table_name::TableName;
pub struct DropTableProcedure {
context: DdlContext,
data: DropTableData,
pub context: DdlContext,
pub data: DropTableData,
}
#[allow(dead_code)]
impl DropTableProcedure {
pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
pub(crate) fn new(
pub fn new(
cluster_id: u64,
task: DropTableTask,
table_route_value: TableRouteValue,
@@ -65,7 +65,7 @@ impl DropTableProcedure {
}
}
pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}
@@ -82,13 +82,12 @@ impl DropTableProcedure {
table_ref.schema,
table_ref.table,
))
.await
.context(TableMetadataManagerSnafu)?;
.await?;
ensure!(
exist,
error::TableNotFoundSnafu {
name: table_ref.to_string()
table_name: table_ref.to_string()
}
);
@@ -106,8 +105,7 @@ impl DropTableProcedure {
table_metadata_manager
.delete_table_metadata(table_info_value, table_route_value)
.await
.context(error::TableMetadataManagerSnafu)?;
.await?;
info!("Deleted table metadata for table {table_id}");
@@ -137,15 +135,14 @@ impl DropTableProcedure {
},
table_ident,
)
.await
.context(error::InvalidateTableCacheSnafu)?;
.await?;
self.data.state = DropTableState::DatanodeDropRegions;
Ok(Status::executing(true))
}
async fn on_datanode_drop_regions(&self) -> Result<Status> {
pub async fn on_datanode_drop_regions(&self) -> Result<Status> {
let table_id = self.data.table_id();
let region_routes = &self.data.region_routes();
@@ -153,7 +150,7 @@ impl DropTableProcedure {
let mut drop_region_tasks = Vec::with_capacity(leaders.len());
for datanode in leaders {
let clients = self.context.datanode_clients.clone();
let clients = self.context.datanode_manager.clone();
let regions = find_leader_regions(region_routes, &datanode);
let region_ids = regions
@@ -169,12 +166,11 @@ impl DropTableProcedure {
region_id: region_id.as_u64(),
});
let client = clients.get_client(&datanode).await;
let requester = RegionRequester::new(client);
let requester = clients.datanode(&datanode).await;
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RegionNotFound {
return Err(handle_request_datanode_error(datanode)(err));
return Err(handle_operate_region_error(datanode)(err));
}
}
}
@@ -232,11 +228,11 @@ impl Procedure for DropTableProcedure {
#[derive(Debug, Serialize, Deserialize)]
pub struct DropTableData {
state: DropTableState,
cluster_id: u64,
task: DropTableTask,
table_route_value: TableRouteValue,
table_info_value: TableInfoValue,
pub state: DropTableState,
pub cluster_id: u64,
pub task: DropTableTask,
pub table_route_value: TableRouteValue,
pub table_info_value: TableInfoValue,
}
impl DropTableData {
@@ -277,7 +273,7 @@ impl DropTableData {
}
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
enum DropTableState {
pub enum DropTableState {
/// Prepares to drop the table
Prepare,
/// Removes metadata
@@ -287,71 +283,3 @@ enum DropTableState {
/// Drops regions on Datanode
DatanodeDropRegions,
}
#[cfg(test)]
mod test {
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use super::*;
use crate::procedure::utils::mock::EchoRegionServer;
use crate::procedure::utils::test_data;
#[tokio::test]
async fn test_on_datanode_drop_regions() {
let drop_table_task = DropTableTask {
catalog: "my_catalog".to_string(),
schema: "my_schema".to_string(),
table: "my_table".to_string(),
table_id: 42,
};
let procedure = DropTableProcedure::new(
1,
drop_table_task,
TableRouteValue::new(test_data::new_region_routes()),
TableInfoValue::new(test_data::new_table_info()),
test_data::new_ddl_context(),
);
let (region_server, mut rx) = EchoRegionServer::new();
let datanodes = find_leaders(&procedure.data.table_route_value.region_routes);
for peer in datanodes {
let client = region_server.new_client(&peer);
procedure
.context
.datanode_clients
.insert_client(peer, client)
.await;
}
let expected_dropped_regions = Arc::new(Mutex::new(HashSet::from([
RegionId::new(42, 1),
RegionId::new(42, 2),
RegionId::new(42, 3),
])));
let handle = tokio::spawn({
let expected_dropped_regions = expected_dropped_regions.clone();
let mut max_recv = expected_dropped_regions.lock().unwrap().len();
async move {
while let Some(region_request::Body::Drop(request)) = rx.recv().await {
let region_id = RegionId::from_u64(request.region_id);
expected_dropped_regions.lock().unwrap().remove(&region_id);
max_recv -= 1;
if max_recv == 0 {
break;
}
}
}
});
let status = procedure.on_datanode_drop_regions().await.unwrap();
assert!(matches!(status, Status::Done));
handle.await.unwrap();
assert!(expected_dropped_regions.lock().unwrap().is_empty());
}
}

View File

@@ -0,0 +1,44 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::BoxedError;
use common_procedure::error::Error as ProcedureError;
use snafu::{location, Location};
use crate::error::{self, Error};
use crate::peer::Peer;
pub fn handle_operate_region_error(datanode: Peer) -> impl FnOnce(crate::error::Error) -> Error {
move |err| {
if matches!(err, crate::error::Error::RetryLater { .. }) {
error::Error::RetryLater {
source: BoxedError::new(err),
}
} else {
error::Error::OperateDatanode {
location: location!(),
peer: datanode,
source: BoxedError::new(err),
}
}
}
}
pub fn handle_retry_error(e: Error) -> ProcedureError {
if matches!(e, error::Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
}

View File

@@ -21,9 +21,43 @@ use snafu::{Location, Snafu};
use store_api::storage::RegionNumber;
use table::metadata::TableId;
use crate::peer::Peer;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to convert RawTableInfo into TableInfo: {}", source))]
ConvertRawTableInfo {
location: Location,
source: datatypes::Error,
},
#[snafu(display("Primary key '{key}' not found when creating region request, at {location}"))]
PrimaryKeyNotFound { key: String, location: Location },
#[snafu(display(
"Failed to build table meta for table: {}, source: {}",
table_name,
source
))]
BuildTableMeta {
table_name: String,
source: table::metadata::TableMetaBuilderError,
location: Location,
},
#[snafu(display("Table occurs error, source: {}", source))]
Table {
location: Location,
source: table::error::Error,
},
#[snafu(display("Table route not found: {}", table_name))]
TableRouteNotFound {
table_name: String,
location: Location,
},
#[snafu(display("Failed to decode protobuf, source: {}", source))]
DecodeProto {
location: Location,
@@ -77,9 +111,9 @@ pub enum Error {
#[snafu(display("Unexpected: {err_msg}"))]
Unexpected { err_msg: String, location: Location },
#[snafu(display("Table already exists, table_id: {}", table_id))]
#[snafu(display("Table already exists, table: {}", table_name))]
TableAlreadyExists {
table_id: TableId,
table_name: String,
location: Location,
},
@@ -99,8 +133,8 @@ pub enum Error {
source: Utf8Error,
},
#[snafu(display("Table does not exist, table_name: {}", table_name))]
TableNotExist {
#[snafu(display("Table nod found, table: {}", table_name))]
TableNotFound {
table_name: String,
location: Location,
},
@@ -163,6 +197,13 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to operate on datanode: {}, source: {}", peer, source))]
OperateDatanode {
location: Location,
peer: Peer,
source: BoxedError,
},
#[snafu(display("Retry later, source: {}", source))]
RetryLater { source: BoxedError },
}
@@ -188,18 +229,26 @@ impl ErrorExt for Error {
SendMessage { .. }
| GetKvCache { .. }
| CacheNotGet { .. }
| TableAlreadyExists { .. }
| CatalogAlreadyExists { .. }
| SchemaAlreadyExists { .. }
| TableNotExist { .. }
| RenameTable { .. } => StatusCode::Internal,
PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
TableNotFound { .. } => StatusCode::TableNotFound,
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
EncodeJson { .. }
| DecodeJson { .. }
| PayloadNotExist { .. }
| ConvertRawKey { .. }
| DecodeProto { .. } => StatusCode::Unexpected,
| DecodeProto { .. }
| BuildTableMeta { .. }
| TableRouteNotFound { .. }
| ConvertRawTableInfo { .. } => StatusCode::Unexpected,
OperateDatanode { source, .. } => source.status_code(),
Table { source, .. } => source.status_code(),
RetryLater { source, .. } => source.status_code(),
OperateRegion { source, .. } => source.status_code(),
ExecuteDdl { source, .. } => source.status_code(),

View File

@@ -16,6 +16,7 @@
pub mod cache_invalidator;
pub mod datanode_manager;
pub mod ddl;
pub mod ddl_manager;
pub mod error;
pub mod heartbeat;

View File

@@ -13,3 +13,6 @@
// limitations under the License.
pub const METRIC_META_TXN_REQUEST: &str = "meta.txn_request";
pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table";
pub(crate) const METRIC_META_PROCEDURE_DROP_TABLE: &str = "meta.procedure.drop_table";

View File

@@ -16,7 +16,10 @@ use std::sync::Arc;
use client::client_manager::DatanodeClients;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::alter_table::AlterTableProcedure;
use common_meta::ddl::create_table::CreateTableProcedure;
use common_meta::ddl::drop_table::DropTableProcedure;
use common_meta::ddl::DdlContext;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
@@ -31,9 +34,6 @@ use crate::error::{
RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, UnsupportedSnafu,
WaitProcedureSnafu,
};
use crate::procedure::alter_table::AlterTableProcedure;
use crate::procedure::create_table::CreateTableProcedure;
use crate::procedure::drop_table::DropTableProcedure;
pub type DdlManagerRef = Arc<DdlManager>;
@@ -45,16 +45,6 @@ pub struct DdlManager {
pub(crate) table_metadata_manager: TableMetadataManagerRef,
}
#[derive(Clone)]
pub(crate) struct DdlContext {
// TODO(weny): removes it
pub(crate) datanode_clients: Arc<DatanodeClients>,
pub(crate) datanode_manager: DatanodeManagerRef,
pub(crate) cache_invalidator: CacheInvalidatorRef,
pub(crate) table_metadata_manager: TableMetadataManagerRef,
}
impl DdlManager {
pub(crate) fn new(
procedure_manager: ProcedureManagerRef,
@@ -73,9 +63,7 @@ impl DdlManager {
pub(crate) fn create_context(&self) -> DdlContext {
DdlContext {
datanode_manager: self.datanode_clients.clone(),
datanode_clients: self.datanode_clients.clone(),
cache_invalidator: self.cache_invalidator.clone(),
table_metadata_manager: self.table_metadata_manager.clone(),
}
}

View File

@@ -64,29 +64,6 @@ pub enum Error {
source: common_grpc_expr::error::Error,
},
#[snafu(display(
"Failed to build table meta for table: {}, source: {}",
table_name,
source
))]
BuildTableMeta {
table_name: String,
source: table::metadata::TableMetaBuilderError,
location: Location,
},
#[snafu(display("Table occurs error, source: {}", source))]
Table {
location: Location,
source: table::error::Error,
},
#[snafu(display("Failed to convert RawTableInfo into TableInfo: {}", source))]
ConvertRawTableInfo {
location: Location,
source: datatypes::Error,
},
#[snafu(display("Failed to execute transaction: {}", msg))]
Txn { location: Location, msg: String },
@@ -529,9 +506,6 @@ pub enum Error {
operation: String,
location: Location,
},
#[snafu(display("Primary key '{key}' not found when creating region request, at {location}"))]
PrimaryKeyNotFound { key: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -586,7 +560,6 @@ impl ErrorExt for Error {
| Error::UnsupportedSelectorType { .. }
| Error::InvalidArguments { .. }
| Error::InvalidHeartbeatRequest { .. }
| Error::PrimaryKeyNotFound { .. }
| Error::TooManyPartitions { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }
@@ -604,12 +577,9 @@ impl ErrorExt for Error {
| Error::UnexpectedInstructionReply { .. }
| Error::Unexpected { .. }
| Error::Txn { .. }
| Error::TableIdChanged { .. }
| Error::ConvertRawTableInfo { .. }
| Error::BuildTableMeta { .. } => StatusCode::Unexpected,
| Error::TableIdChanged { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::Table { source, .. } => source.status_code(),
Error::RequestDatanode { source, .. } => source.status_code(),
Error::InvalidCatalogValue { source, .. } => source.status_code(),
Error::RecoverProcedure { source, .. }

View File

@@ -18,6 +18,3 @@ pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request";
pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request";
pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num";
pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute";
pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table";
pub(crate) const METRIC_META_PROCEDURE_DROP_TABLE: &str = "meta.procedure.drop_table";

View File

@@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod alter_table;
pub mod create_table;
pub mod drop_table;
pub mod region_failover;
pub(crate) mod state_store;
#[cfg(test)]
mod tests;
mod utils;

View File

@@ -0,0 +1,240 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use api::v1::region::region_request::{self, Body as PbRegionRequest};
use api::v1::region::{ColumnDef, CreateRequest as PbCreateRegionRequest};
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, CreateTableExpr, SemanticType};
use client::client_manager::DatanodeClients;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::ddl::create_table::*;
use common_meta::ddl::drop_table::DropTableProcedure;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::rpc::ddl::{CreateTableTask, DropTableTask};
use common_meta::rpc::router::find_leaders;
use common_procedure::Status;
use store_api::storage::RegionId;
use crate::procedure::utils::mock::EchoRegionServer;
use crate::procedure::utils::test_data;
fn create_table_procedure() -> CreateTableProcedure {
let create_table_expr = CreateTableExpr {
catalog_name: "my_catalog".to_string(),
schema_name: "my_schema".to_string(),
table_name: "my_table".to_string(),
desc: "blabla".to_string(),
column_defs: vec![
PbColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
},
PbColumnDef {
name: "my_tag1".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
},
PbColumnDef {
name: "my_tag2".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
},
PbColumnDef {
name: "my_field_column".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
},
],
time_index: "ts".to_string(),
primary_keys: vec!["my_tag2".to_string(), "my_tag1".to_string()],
create_if_not_exists: false,
table_options: HashMap::new(),
table_id: None,
region_numbers: vec![1, 2, 3],
engine: MITO2_ENGINE.to_string(),
};
CreateTableProcedure::new(
1,
CreateTableTask::new(create_table_expr, vec![], test_data::new_table_info()),
test_data::new_region_routes(),
test_data::new_ddl_context(),
)
}
#[test]
fn test_create_region_request_template() {
let procedure = create_table_procedure();
let template = procedure.create_region_request_template().unwrap();
let expected = PbCreateRegionRequest {
region_id: 0,
engine: MITO2_ENGINE.to_string(),
column_defs: vec![
ColumnDef {
name: "ts".to_string(),
column_id: 0,
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
semantic_type: SemanticType::Timestamp as i32,
},
ColumnDef {
name: "my_tag1".to_string(),
column_id: 1,
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Tag as i32,
},
ColumnDef {
name: "my_tag2".to_string(),
column_id: 2,
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Tag as i32,
},
ColumnDef {
name: "my_field_column".to_string(),
column_id: 3,
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
semantic_type: SemanticType::Field as i32,
},
],
primary_key: vec![2, 1],
create_if_not_exists: true,
catalog: String::new(),
schema: String::new(),
options: HashMap::new(),
};
assert_eq!(template, expected);
}
#[tokio::test]
async fn test_on_datanode_create_regions() {
let mut procedure = create_table_procedure();
let (region_server, mut rx) = EchoRegionServer::new();
let datanodes = find_leaders(&procedure.creator.data.region_routes);
let datanode_clients = DatanodeClients::default();
for peer in datanodes {
let client = region_server.new_client(&peer);
datanode_clients.insert_client(peer, client).await;
}
procedure.context.datanode_manager = Arc::new(datanode_clients);
let expected_created_regions = Arc::new(Mutex::new(HashSet::from([
RegionId::new(42, 1),
RegionId::new(42, 2),
RegionId::new(42, 3),
])));
let handle = tokio::spawn({
let expected_created_regions = expected_created_regions.clone();
let mut max_recv = expected_created_regions.lock().unwrap().len();
async move {
while let Some(PbRegionRequest::Create(request)) = rx.recv().await {
let region_id = RegionId::from_u64(request.region_id);
expected_created_regions.lock().unwrap().remove(&region_id);
max_recv -= 1;
if max_recv == 0 {
break;
}
}
}
});
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(status, Status::Executing { persist: true }));
assert!(matches!(
procedure.creator.data.state,
CreateTableState::CreateMetadata
));
handle.await.unwrap();
assert!(expected_created_regions.lock().unwrap().is_empty());
}
#[tokio::test]
async fn test_on_datanode_drop_regions() {
let drop_table_task = DropTableTask {
catalog: "my_catalog".to_string(),
schema: "my_schema".to_string(),
table: "my_table".to_string(),
table_id: 42,
};
let mut procedure = DropTableProcedure::new(
1,
drop_table_task,
TableRouteValue::new(test_data::new_region_routes()),
TableInfoValue::new(test_data::new_table_info()),
test_data::new_ddl_context(),
);
let (region_server, mut rx) = EchoRegionServer::new();
let datanodes = find_leaders(&procedure.data.table_route_value.region_routes);
let datanode_clients = DatanodeClients::default();
for peer in datanodes {
let client = region_server.new_client(&peer);
datanode_clients.insert_client(peer, client).await;
}
procedure.context.datanode_manager = Arc::new(datanode_clients);
let expected_dropped_regions = Arc::new(Mutex::new(HashSet::from([
RegionId::new(42, 1),
RegionId::new(42, 2),
RegionId::new(42, 3),
])));
let handle = tokio::spawn({
let expected_dropped_regions = expected_dropped_regions.clone();
let mut max_recv = expected_dropped_regions.lock().unwrap().len();
async move {
while let Some(region_request::Body::Drop(request)) = rx.recv().await {
let region_id = RegionId::from_u64(request.region_id);
expected_dropped_regions.lock().unwrap().remove(&region_id);
max_recv -= 1;
if max_recv == 0 {
break;
}
}
}
});
let status = procedure.on_datanode_drop_regions().await.unwrap();
assert!(matches!(status, Status::Done));
handle.await.unwrap();
assert!(expected_dropped_regions.lock().unwrap().is_empty());
}

View File

@@ -12,57 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_error::ext::BoxedError;
use common_meta::peer::Peer;
use common_procedure::error::Error as ProcedureError;
use snafu::{location, Location};
use crate::error::{self, Error};
pub fn handle_request_datanode_error(datanode: Peer) -> impl FnOnce(client::error::Error) -> Error {
move |err| {
if matches!(err, client::error::Error::FlightGet { .. }) {
error::RetryLaterSnafu {
reason: format!("Failed to execute operation on datanode, source: {}", err),
}
.build()
} else {
error::Error::RequestDatanode {
location: location!(),
peer: datanode,
source: err,
}
}
}
}
pub fn handle_operate_region_error(
datanode: Peer,
) -> impl FnOnce(common_meta::error::Error) -> Error {
move |err| {
if matches!(err, common_meta::error::Error::RetryLater { .. }) {
error::RetryLaterSnafu {
reason: format!("Failed to execute operation on datanode, source: {}", err),
}
.build()
} else {
error::Error::OperateRegion {
location: location!(),
peer: datanode,
source: BoxedError::new(err),
}
}
}
}
pub fn handle_retry_error(e: Error) -> ProcedureError {
if matches!(e, error::Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
}
#[cfg(test)]
pub mod mock {
use std::io::Error;
@@ -157,6 +106,7 @@ pub mod test_data {
use chrono::DateTime;
use client::client_manager::DatanodeClients;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::ddl::DdlContext;
use common_meta::key::TableMetadataManager;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
@@ -166,7 +116,6 @@ pub mod test_data {
use table::requests::TableOptions;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::ddl::DdlContext;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::metasrv::MetasrvInfo;
use crate::sequence::Sequence;
@@ -244,7 +193,6 @@ pub mod test_data {
let kv_backend = KvBackendAdapter::wrap(kv_store);
let clients = Arc::new(DatanodeClients::default());
DdlContext {
datanode_clients: clients.clone(),
datanode_manager: clients,
cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
mailbox,