refactor: refactor ddl manager (#2306)

* refactor: refactor ddl manager

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-09-05 11:26:24 +08:00
committed by Ruihang Xia
parent 7dde9ce3ce
commit 922e342b63
15 changed files with 668 additions and 544 deletions

View File

@@ -10,6 +10,7 @@ async-stream.workspace = true
async-trait.workspace = true
common-catalog = { workspace = true }
common-error = { workspace = true }
common-grpc-expr = { workspace = true }
common-procedure = { workspace = true }
common-runtime = { workspace = true }
common-telemetry = { workspace = true }

View File

@@ -12,15 +12,56 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::meta::Partition;
use store_api::storage::TableId;
use table::metadata::RawTableInfo;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::error::Result;
use crate::key::TableMetadataManagerRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::router::RegionRoute;
pub mod alter_table;
pub mod create_table;
pub mod drop_table;
pub mod utils;
#[derive(Debug, Default)]
pub struct ExecutorContext {
pub cluster_id: Option<u64>,
}
#[async_trait::async_trait]
pub trait DdlExecutor: Send + Sync {
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse>;
}
pub type DdlExecutorRef = Arc<dyn DdlExecutor>;
pub struct TableCreatorContext {
pub cluster_id: u64,
}
#[async_trait::async_trait]
pub trait TableCreator: Send + Sync {
async fn create(
&self,
ctx: &TableCreatorContext,
table_info: &mut RawTableInfo,
partitions: &[Partition],
) -> Result<(TableId, Vec<RegionRoute>)>;
}
pub type TableCreatorRef = Arc<dyn TableCreator>;
#[derive(Clone)]
pub struct DdlContext {
pub datanode_manager: DatanodeManagerRef,

View File

@@ -14,18 +14,383 @@
use std::sync::Arc;
use crate::error::Result;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_grpc_expr::alter_expr_to_request;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::{error, info};
use snafu::{OptionExt, ResultExt};
use table::requests::AlterTableRequest;
pub struct Context;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::{DdlContext, DdlExecutor, ExecutorContext, TableCreatorContext, TableCreatorRef};
use crate::error::{
self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, UnsupportedSnafu,
WaitProcedureSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::rpc::ddl::DdlTask::{AlterTable, CreateTable, DropTable, TruncateTable};
use crate::rpc::ddl::{
AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
TruncateTableTask,
};
use crate::rpc::router::RegionRoute;
#[async_trait::async_trait]
pub trait DdlExecutor: Send + Sync {
async fn submit_ddl_task(
&self,
ctx: &Context,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse>;
pub type DdlManagerRef = Arc<DdlManager>;
pub struct DdlManager {
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_creator: TableCreatorRef,
}
pub type DdlExecutorRef = Arc<dyn DdlExecutor>;
impl DdlManager {
pub fn new(
procedure_manager: ProcedureManagerRef,
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_creator: TableCreatorRef,
) -> Self {
Self {
procedure_manager,
datanode_manager: datanode_clients,
cache_invalidator,
table_metadata_manager,
table_creator,
}
}
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}
pub fn create_context(&self) -> DdlContext {
DdlContext {
datanode_manager: self.datanode_manager.clone(),
cache_invalidator: self.cache_invalidator.clone(),
table_metadata_manager: self.table_metadata_manager.clone(),
}
}
pub fn try_start(&self) -> Result<()> {
let context = self.create_context();
self.procedure_manager
.register_loader(
CreateTableProcedure::TYPE_NAME,
Box::new(move |json| {
let context = context.clone();
CreateTableProcedure::from_json(json, context).map(|p| Box::new(p) as _)
}),
)
.context(RegisterProcedureLoaderSnafu {
type_name: CreateTableProcedure::TYPE_NAME,
})?;
let context = self.create_context();
self.procedure_manager
.register_loader(
DropTableProcedure::TYPE_NAME,
Box::new(move |json| {
let context = context.clone();
DropTableProcedure::from_json(json, context).map(|p| Box::new(p) as _)
}),
)
.context(RegisterProcedureLoaderSnafu {
type_name: DropTableProcedure::TYPE_NAME,
})?;
let context = self.create_context();
self.procedure_manager
.register_loader(
AlterTableProcedure::TYPE_NAME,
Box::new(move |json| {
let context = context.clone();
AlterTableProcedure::from_json(json, context).map(|p| Box::new(p) as _)
}),
)
.context(RegisterProcedureLoaderSnafu {
type_name: AlterTableProcedure::TYPE_NAME,
})
}
pub async fn submit_alter_table_task(
&self,
cluster_id: u64,
alter_table_task: AlterTableTask,
alter_table_request: AlterTableRequest,
table_info_value: TableInfoValue,
) -> Result<ProcedureId> {
let context = self.create_context();
let procedure = AlterTableProcedure::new(
cluster_id,
alter_table_task,
alter_table_request,
table_info_value,
context,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
pub async fn submit_create_table_task(
&self,
cluster_id: u64,
create_table_task: CreateTableTask,
region_routes: Vec<RegionRoute>,
) -> Result<ProcedureId> {
let context = self.create_context();
let procedure =
CreateTableProcedure::new(cluster_id, create_table_task, region_routes, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
pub async fn submit_drop_table_task(
&self,
cluster_id: u64,
drop_table_task: DropTableTask,
table_info_value: TableInfoValue,
table_route_value: TableRouteValue,
) -> Result<ProcedureId> {
let context = self.create_context();
let procedure = DropTableProcedure::new(
cluster_id,
drop_table_task,
table_route_value,
table_info_value,
context,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
pub async fn submit_truncate_table_task(
&self,
cluster_id: u64,
truncate_table_task: TruncateTableTask,
region_routes: Vec<RegionRoute>,
) -> Result<ProcedureId> {
error!("Truncate table procedure is not supported, cluster_id = {}, truncate_table_task = {:?}, region_routes = {:?}",
cluster_id, truncate_table_task, region_routes);
UnsupportedSnafu {
operation: "TRUNCATE TABLE",
}
.fail()
}
async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
let procedure_id = procedure_with_id.id;
let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(SubmitProcedureSnafu)?;
watcher::wait(&mut watcher)
.await
.context(WaitProcedureSnafu)?;
Ok(procedure_id)
}
}
async fn handle_truncate_table_task(
ddl_manager: &DdlManager,
cluster_id: u64,
truncate_table_task: TruncateTableTask,
) -> Result<SubmitDdlTaskResponse> {
let truncate_table = &truncate_table_task.truncate_table;
let table_id = truncate_table
.table_id
.as_ref()
.context(error::UnexpectedSnafu {
err_msg: "expected table id ",
})?
.id;
let table_ref = truncate_table_task.table_ref();
let table_route_value = ddl_manager
.table_metadata_manager()
.table_route_manager()
.get(table_id)
.await?
.with_context(|| error::TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table_route = table_route_value.region_routes;
let id = ddl_manager
.submit_truncate_table_task(cluster_id, truncate_table_task, table_route)
.await?;
Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
..Default::default()
})
}
async fn handle_alter_table_task(
ddl_manager: &DdlManager,
cluster_id: u64,
mut alter_table_task: AlterTableTask,
) -> Result<SubmitDdlTaskResponse> {
let table_id = alter_table_task
.alter_table
.table_id
.as_ref()
.context(error::UnexpectedSnafu {
err_msg: "expected table id ",
})?
.id;
let mut alter_table_request =
alter_expr_to_request(table_id, alter_table_task.alter_table.clone())
.context(error::ConvertGrpcExprSnafu)?;
let table_ref = alter_table_task.table_ref();
let table_info_value = ddl_manager
.table_metadata_manager()
.table_info_manager()
.get(table_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table_info = &table_info_value.table_info;
// Sets alter_table's table_version
alter_table_task.alter_table.table_version = table_info.ident.version;
alter_table_request.table_version = Some(table_info.ident.version);
let id = ddl_manager
.submit_alter_table_task(
cluster_id,
alter_table_task,
alter_table_request,
table_info_value,
)
.await?;
info!("Table: {table_id} is altered via procedure_id {id:?}");
Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
..Default::default()
})
}
async fn handle_drop_table_task(
ddl_manager: &DdlManager,
cluster_id: u64,
drop_table_task: DropTableTask,
) -> Result<SubmitDdlTaskResponse> {
let table_id = drop_table_task.table_id;
let table_metadata_manager = &ddl_manager.table_metadata_manager();
let table_ref = drop_table_task.table_ref();
let (table_info_value, table_route_value) =
table_metadata_manager.get_full_table_info(table_id).await?;
let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let table_route_value = table_route_value.with_context(|| error::TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let id = ddl_manager
.submit_drop_table_task(
cluster_id,
drop_table_task,
table_info_value,
table_route_value,
)
.await?;
info!("Table: {table_id} is dropped via procedure_id {id:?}");
Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
..Default::default()
})
}
async fn handle_create_table_task(
ddl_manager: &DdlManager,
cluster_id: u64,
mut create_table_task: CreateTableTask,
) -> Result<SubmitDdlTaskResponse> {
let (table_id, region_routes) = ddl_manager
.table_creator
.create(
&TableCreatorContext { cluster_id },
&mut create_table_task.table_info,
&create_table_task.partitions,
)
.await?;
let id = ddl_manager
.submit_create_table_task(cluster_id, create_table_task, region_routes)
.await?;
info!("Table: {table_id} is created via procedure_id {id:?}");
Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
table_id: Some(table_id),
})
}
#[async_trait::async_trait]
impl DdlExecutor for DdlManager {
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
let cluster_id = ctx.cluster_id.context(error::UnexpectedSnafu {
err_msg: "cluster_id not found",
})?;
info!("Submitting Ddl task: {:?}", request.task);
match request.task {
CreateTable(create_table_task) => {
handle_create_table_task(self, cluster_id, create_table_task).await
}
DropTable(drop_table_task) => {
handle_drop_table_task(self, cluster_id, drop_table_task).await
}
AlterTable(alter_table_task) => {
handle_alter_table_task(self, cluster_id, alter_table_task).await
}
TruncateTable(truncate_table_task) => {
handle_truncate_table_task(self, cluster_id, truncate_table_task).await
}
}
}
}

View File

@@ -26,6 +26,47 @@ use crate::peer::Peer;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to convert grpc expr, source: {}", source))]
ConvertGrpcExpr {
location: Location,
source: common_grpc_expr::error::Error,
},
#[snafu(display("Table info not found: {}", table_name))]
TableInfoNotFound {
table_name: String,
location: Location,
},
#[snafu(display(
"Failed to register procedure loader, type name: {}, source: {}",
type_name,
source
))]
RegisterProcedureLoader {
type_name: String,
location: Location,
source: common_procedure::error::Error,
},
#[snafu(display("Failed to submit procedure, source: {source}"))]
SubmitProcedure {
location: Location,
source: common_procedure::Error,
},
#[snafu(display("Unsupported operation {}, location: {}", operation, location))]
Unsupported {
operation: String,
location: Location,
},
#[snafu(display("Failed to wait procedure done, source: {source}"))]
WaitProcedure {
location: Location,
source: common_procedure::Error,
},
#[snafu(display("Failed to convert RawTableInfo into TableInfo: {}", source))]
ConvertRawTableInfo {
location: Location,
@@ -224,6 +265,8 @@ impl ErrorExt for Error {
| MoveRegion { .. }
| Unexpected { .. }
| External { .. }
| ConvertGrpcExpr { .. }
| TableInfoNotFound { .. }
| InvalidHeartbeatResponse { .. } => StatusCode::Unexpected,
SendMessage { .. }
@@ -231,7 +274,8 @@ impl ErrorExt for Error {
| CacheNotGet { .. }
| CatalogAlreadyExists { .. }
| SchemaAlreadyExists { .. }
| RenameTable { .. } => StatusCode::Internal,
| RenameTable { .. }
| Unsupported { .. } => StatusCode::Internal,
PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments,
@@ -247,6 +291,8 @@ impl ErrorExt for Error {
| TableRouteNotFound { .. }
| ConvertRawTableInfo { .. } => StatusCode::Unexpected,
SubmitProcedure { source, .. } | WaitProcedure { source, .. } => source.status_code(),
RegisterProcedureLoader { source, .. } => source.status_code(),
OperateDatanode { source, .. } => source.status_code(),
Table { source, .. } => source.status_code(),
RetryLater { source, .. } => source.status_code(),

View File

@@ -123,6 +123,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbSubmitDdlTaskRequest {
}
}
#[derive(Debug, Default)]
pub struct SubmitDdlTaskResponse {
pub key: Vec<u8>,
pub table_id: Option<TableId>,
@@ -140,6 +141,18 @@ impl TryFrom<PbSubmitDdlTaskResponse> for SubmitDdlTaskResponse {
}
}
impl From<SubmitDdlTaskResponse> for PbSubmitDdlTaskResponse {
fn from(val: SubmitDdlTaskResponse) -> Self {
Self {
key: val.key,
table_id: val
.table_id
.map(|table_id| api::v1::meta::TableId { id: table_id }),
..Default::default()
}
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct DropTableTask {
pub catalog: String,