diff --git a/Cargo.lock b/Cargo.lock index e2bc1ce016..822f3ee276 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1844,6 +1844,7 @@ dependencies = [ "chrono", "common-catalog", "common-error", + "common-grpc-expr", "common-procedure", "common-runtime", "common-telemetry", diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 8e9ae24a1d..238488639d 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -10,6 +10,7 @@ async-stream.workspace = true async-trait.workspace = true common-catalog = { workspace = true } common-error = { workspace = true } +common-grpc-expr = { workspace = true } common-procedure = { workspace = true } common-runtime = { workspace = true } common-telemetry = { workspace = true } diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 5f8e147af8..220274100f 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -12,15 +12,56 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use api::v1::meta::Partition; +use store_api::storage::TableId; +use table::metadata::RawTableInfo; + use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; +use crate::error::Result; use crate::key::TableMetadataManagerRef; +use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use crate::rpc::router::RegionRoute; pub mod alter_table; pub mod create_table; pub mod drop_table; pub mod utils; +#[derive(Debug, Default)] +pub struct ExecutorContext { + pub cluster_id: Option, +} + +#[async_trait::async_trait] +pub trait DdlExecutor: Send + Sync { + async fn submit_ddl_task( + &self, + ctx: &ExecutorContext, + request: SubmitDdlTaskRequest, + ) -> Result; +} + +pub type DdlExecutorRef = Arc; + +pub struct TableCreatorContext { + pub cluster_id: u64, +} + +#[async_trait::async_trait] +pub trait TableCreator: Send + Sync { + async fn create( + &self, + ctx: &TableCreatorContext, + table_info: &mut RawTableInfo, + partitions: &[Partition], + ) -> Result<(TableId, Vec)>; +} + +pub type TableCreatorRef = Arc; + #[derive(Clone)] pub struct DdlContext { pub datanode_manager: DatanodeManagerRef, diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 020d4c8c09..2df1505b8b 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -14,18 +14,383 @@ use std::sync::Arc; -use crate::error::Result; -use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use common_grpc_expr::alter_expr_to_request; +use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; +use common_telemetry::{error, info}; +use snafu::{OptionExt, ResultExt}; +use table::requests::AlterTableRequest; -pub struct Context; +use crate::cache_invalidator::CacheInvalidatorRef; +use crate::datanode_manager::DatanodeManagerRef; +use crate::ddl::alter_table::AlterTableProcedure; +use crate::ddl::create_table::CreateTableProcedure; +use crate::ddl::drop_table::DropTableProcedure; +use crate::ddl::{DdlContext, DdlExecutor, ExecutorContext, TableCreatorContext, TableCreatorRef}; +use crate::error::{ + self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, UnsupportedSnafu, + WaitProcedureSnafu, +}; +use crate::key::table_info::TableInfoValue; +use crate::key::table_route::TableRouteValue; +use crate::key::TableMetadataManagerRef; +use crate::rpc::ddl::DdlTask::{AlterTable, CreateTable, DropTable, TruncateTable}; +use crate::rpc::ddl::{ + AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, + TruncateTableTask, +}; +use crate::rpc::router::RegionRoute; -#[async_trait::async_trait] -pub trait DdlExecutor: Send + Sync { - async fn submit_ddl_task( - &self, - ctx: &Context, - request: SubmitDdlTaskRequest, - ) -> Result; +pub type DdlManagerRef = Arc; + +pub struct DdlManager { + procedure_manager: ProcedureManagerRef, + datanode_manager: DatanodeManagerRef, + cache_invalidator: CacheInvalidatorRef, + table_metadata_manager: TableMetadataManagerRef, + table_creator: TableCreatorRef, } -pub type DdlExecutorRef = Arc; +impl DdlManager { + pub fn new( + procedure_manager: ProcedureManagerRef, + datanode_clients: DatanodeManagerRef, + cache_invalidator: CacheInvalidatorRef, + table_metadata_manager: TableMetadataManagerRef, + table_creator: TableCreatorRef, + ) -> Self { + Self { + procedure_manager, + datanode_manager: datanode_clients, + cache_invalidator, + table_metadata_manager, + table_creator, + } + } + + pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } + + pub fn create_context(&self) -> DdlContext { + DdlContext { + datanode_manager: self.datanode_manager.clone(), + cache_invalidator: self.cache_invalidator.clone(), + table_metadata_manager: self.table_metadata_manager.clone(), + } + } + + pub fn try_start(&self) -> Result<()> { + let context = self.create_context(); + + self.procedure_manager + .register_loader( + CreateTableProcedure::TYPE_NAME, + Box::new(move |json| { + let context = context.clone(); + CreateTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }), + ) + .context(RegisterProcedureLoaderSnafu { + type_name: CreateTableProcedure::TYPE_NAME, + })?; + + let context = self.create_context(); + + self.procedure_manager + .register_loader( + DropTableProcedure::TYPE_NAME, + Box::new(move |json| { + let context = context.clone(); + DropTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }), + ) + .context(RegisterProcedureLoaderSnafu { + type_name: DropTableProcedure::TYPE_NAME, + })?; + + let context = self.create_context(); + + self.procedure_manager + .register_loader( + AlterTableProcedure::TYPE_NAME, + Box::new(move |json| { + let context = context.clone(); + AlterTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }), + ) + .context(RegisterProcedureLoaderSnafu { + type_name: AlterTableProcedure::TYPE_NAME, + }) + } + + pub async fn submit_alter_table_task( + &self, + cluster_id: u64, + alter_table_task: AlterTableTask, + alter_table_request: AlterTableRequest, + table_info_value: TableInfoValue, + ) -> Result { + let context = self.create_context(); + + let procedure = AlterTableProcedure::new( + cluster_id, + alter_table_task, + alter_table_request, + table_info_value, + context, + ); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + + pub async fn submit_create_table_task( + &self, + cluster_id: u64, + create_table_task: CreateTableTask, + region_routes: Vec, + ) -> Result { + let context = self.create_context(); + + let procedure = + CreateTableProcedure::new(cluster_id, create_table_task, region_routes, context); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + + pub async fn submit_drop_table_task( + &self, + cluster_id: u64, + drop_table_task: DropTableTask, + table_info_value: TableInfoValue, + table_route_value: TableRouteValue, + ) -> Result { + let context = self.create_context(); + + let procedure = DropTableProcedure::new( + cluster_id, + drop_table_task, + table_route_value, + table_info_value, + context, + ); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + + pub async fn submit_truncate_table_task( + &self, + cluster_id: u64, + truncate_table_task: TruncateTableTask, + region_routes: Vec, + ) -> Result { + error!("Truncate table procedure is not supported, cluster_id = {}, truncate_table_task = {:?}, region_routes = {:?}", + cluster_id, truncate_table_task, region_routes); + + UnsupportedSnafu { + operation: "TRUNCATE TABLE", + } + .fail() + } + + async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result { + let procedure_id = procedure_with_id.id; + + let mut watcher = self + .procedure_manager + .submit(procedure_with_id) + .await + .context(SubmitProcedureSnafu)?; + + watcher::wait(&mut watcher) + .await + .context(WaitProcedureSnafu)?; + + Ok(procedure_id) + } +} + +async fn handle_truncate_table_task( + ddl_manager: &DdlManager, + cluster_id: u64, + truncate_table_task: TruncateTableTask, +) -> Result { + let truncate_table = &truncate_table_task.truncate_table; + let table_id = truncate_table + .table_id + .as_ref() + .context(error::UnexpectedSnafu { + err_msg: "expected table id ", + })? + .id; + + let table_ref = truncate_table_task.table_ref(); + + let table_route_value = ddl_manager + .table_metadata_manager() + .table_route_manager() + .get(table_id) + .await? + .with_context(|| error::TableRouteNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + + let table_route = table_route_value.region_routes; + + let id = ddl_manager + .submit_truncate_table_task(cluster_id, truncate_table_task, table_route) + .await?; + + Ok(SubmitDdlTaskResponse { + key: id.to_string().into(), + ..Default::default() + }) +} + +async fn handle_alter_table_task( + ddl_manager: &DdlManager, + cluster_id: u64, + mut alter_table_task: AlterTableTask, +) -> Result { + let table_id = alter_table_task + .alter_table + .table_id + .as_ref() + .context(error::UnexpectedSnafu { + err_msg: "expected table id ", + })? + .id; + + let mut alter_table_request = + alter_expr_to_request(table_id, alter_table_task.alter_table.clone()) + .context(error::ConvertGrpcExprSnafu)?; + + let table_ref = alter_table_task.table_ref(); + + let table_info_value = ddl_manager + .table_metadata_manager() + .table_info_manager() + .get(table_id) + .await? + .with_context(|| error::TableInfoNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + + let table_info = &table_info_value.table_info; + + // Sets alter_table's table_version + alter_table_task.alter_table.table_version = table_info.ident.version; + alter_table_request.table_version = Some(table_info.ident.version); + + let id = ddl_manager + .submit_alter_table_task( + cluster_id, + alter_table_task, + alter_table_request, + table_info_value, + ) + .await?; + + info!("Table: {table_id} is altered via procedure_id {id:?}"); + + Ok(SubmitDdlTaskResponse { + key: id.to_string().into(), + ..Default::default() + }) +} + +async fn handle_drop_table_task( + ddl_manager: &DdlManager, + cluster_id: u64, + drop_table_task: DropTableTask, +) -> Result { + let table_id = drop_table_task.table_id; + let table_metadata_manager = &ddl_manager.table_metadata_manager(); + let table_ref = drop_table_task.table_ref(); + + let (table_info_value, table_route_value) = + table_metadata_manager.get_full_table_info(table_id).await?; + + let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + + let table_route_value = table_route_value.with_context(|| error::TableRouteNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + + let id = ddl_manager + .submit_drop_table_task( + cluster_id, + drop_table_task, + table_info_value, + table_route_value, + ) + .await?; + + info!("Table: {table_id} is dropped via procedure_id {id:?}"); + + Ok(SubmitDdlTaskResponse { + key: id.to_string().into(), + ..Default::default() + }) +} + +async fn handle_create_table_task( + ddl_manager: &DdlManager, + cluster_id: u64, + mut create_table_task: CreateTableTask, +) -> Result { + let (table_id, region_routes) = ddl_manager + .table_creator + .create( + &TableCreatorContext { cluster_id }, + &mut create_table_task.table_info, + &create_table_task.partitions, + ) + .await?; + + let id = ddl_manager + .submit_create_table_task(cluster_id, create_table_task, region_routes) + .await?; + + info!("Table: {table_id} is created via procedure_id {id:?}"); + + Ok(SubmitDdlTaskResponse { + key: id.to_string().into(), + table_id: Some(table_id), + }) +} + +#[async_trait::async_trait] +impl DdlExecutor for DdlManager { + async fn submit_ddl_task( + &self, + ctx: &ExecutorContext, + request: SubmitDdlTaskRequest, + ) -> Result { + let cluster_id = ctx.cluster_id.context(error::UnexpectedSnafu { + err_msg: "cluster_id not found", + })?; + info!("Submitting Ddl task: {:?}", request.task); + match request.task { + CreateTable(create_table_task) => { + handle_create_table_task(self, cluster_id, create_table_task).await + } + DropTable(drop_table_task) => { + handle_drop_table_task(self, cluster_id, drop_table_task).await + } + AlterTable(alter_table_task) => { + handle_alter_table_task(self, cluster_id, alter_table_task).await + } + TruncateTable(truncate_table_task) => { + handle_truncate_table_task(self, cluster_id, truncate_table_task).await + } + } + } +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 06f51857c1..6eab1bd79a 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -26,6 +26,47 @@ use crate::peer::Peer; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to convert grpc expr, source: {}", source))] + ConvertGrpcExpr { + location: Location, + source: common_grpc_expr::error::Error, + }, + + #[snafu(display("Table info not found: {}", table_name))] + TableInfoNotFound { + table_name: String, + location: Location, + }, + + #[snafu(display( + "Failed to register procedure loader, type name: {}, source: {}", + type_name, + source + ))] + RegisterProcedureLoader { + type_name: String, + location: Location, + source: common_procedure::error::Error, + }, + + #[snafu(display("Failed to submit procedure, source: {source}"))] + SubmitProcedure { + location: Location, + source: common_procedure::Error, + }, + + #[snafu(display("Unsupported operation {}, location: {}", operation, location))] + Unsupported { + operation: String, + location: Location, + }, + + #[snafu(display("Failed to wait procedure done, source: {source}"))] + WaitProcedure { + location: Location, + source: common_procedure::Error, + }, + #[snafu(display("Failed to convert RawTableInfo into TableInfo: {}", source))] ConvertRawTableInfo { location: Location, @@ -224,6 +265,8 @@ impl ErrorExt for Error { | MoveRegion { .. } | Unexpected { .. } | External { .. } + | ConvertGrpcExpr { .. } + | TableInfoNotFound { .. } | InvalidHeartbeatResponse { .. } => StatusCode::Unexpected, SendMessage { .. } @@ -231,7 +274,8 @@ impl ErrorExt for Error { | CacheNotGet { .. } | CatalogAlreadyExists { .. } | SchemaAlreadyExists { .. } - | RenameTable { .. } => StatusCode::Internal, + | RenameTable { .. } + | Unsupported { .. } => StatusCode::Internal, PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments, @@ -247,6 +291,8 @@ impl ErrorExt for Error { | TableRouteNotFound { .. } | ConvertRawTableInfo { .. } => StatusCode::Unexpected, + SubmitProcedure { source, .. } | WaitProcedure { source, .. } => source.status_code(), + RegisterProcedureLoader { source, .. } => source.status_code(), OperateDatanode { source, .. } => source.status_code(), Table { source, .. } => source.status_code(), RetryLater { source, .. } => source.status_code(), diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index e12ee2abd0..d1ca0dea94 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -123,6 +123,7 @@ impl TryFrom for PbSubmitDdlTaskRequest { } } +#[derive(Debug, Default)] pub struct SubmitDdlTaskResponse { pub key: Vec, pub table_id: Option, @@ -140,6 +141,18 @@ impl TryFrom for SubmitDdlTaskResponse { } } +impl From for PbSubmitDdlTaskResponse { + fn from(val: SubmitDdlTaskResponse) -> Self { + Self { + key: val.key, + table_id: val + .table_id + .map(|table_id| api::v1::meta::TableId { id: table_id }), + ..Default::default() + } + } +} + #[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct DropTableTask { pub catalog: String, diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index d7ee060986..4a31b2e623 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -31,7 +31,7 @@ use chrono::DateTime; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; -use common_meta::ddl_manager::{Context, DdlExecutorRef}; +use common_meta::ddl::{DdlExecutorRef, ExecutorContext}; use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::{Partition, Partition as MetaPartition}; @@ -441,7 +441,7 @@ impl DistInstance { }; self.ddl_executor - .submit_ddl_task(&Context, req) + .submit_ddl_task(&ExecutorContext::default(), req) .await .context(error::ExecuteDdlSnafu)?; @@ -466,7 +466,7 @@ impl DistInstance { }; self.ddl_executor - .submit_ddl_task(&Context, request) + .submit_ddl_task(&ExecutorContext::default(), request) .await .context(error::ExecuteDdlSnafu) } @@ -486,7 +486,7 @@ impl DistInstance { }; self.ddl_executor - .submit_ddl_task(&Context, request) + .submit_ddl_task(&ExecutorContext::default(), request) .await .context(error::ExecuteDdlSnafu) } @@ -500,7 +500,7 @@ impl DistInstance { }; self.ddl_executor - .submit_ddl_task(&Context, request) + .submit_ddl_task(&ExecutorContext::default(), request) .await .context(error::ExecuteDdlSnafu) } diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index f8e4d10348..0648ff282f 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -23,7 +23,7 @@ mod store; use api::v1::meta::Role; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::ddl_manager::{Context, DdlExecutor}; +use common_meta::ddl::{DdlExecutor, ExecutorContext}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest}; @@ -181,7 +181,7 @@ pub struct MetaClient { impl DdlExecutor for MetaClient { async fn submit_ddl_task( &self, - _ctx: &Context, + _ctx: &ExecutorContext, request: SubmitDdlTaskRequest, ) -> MetaResult { self.submit_ddl_task(request) diff --git a/src/meta-srv/src/ddl.rs b/src/meta-srv/src/ddl.rs deleted file mode 100644 index ddb20883ba..0000000000 --- a/src/meta-srv/src/ddl.rs +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use client::client_manager::DatanodeClients; -use common_meta::cache_invalidator::CacheInvalidatorRef; -use common_meta::ddl::alter_table::AlterTableProcedure; -use common_meta::ddl::create_table::CreateTableProcedure; -use common_meta::ddl::drop_table::DropTableProcedure; -use common_meta::ddl::DdlContext; -use common_meta::key::table_info::TableInfoValue; -use common_meta::key::table_route::TableRouteValue; -use common_meta::key::TableMetadataManagerRef; -use common_meta::rpc::ddl::{AlterTableTask, CreateTableTask, DropTableTask, TruncateTableTask}; -use common_meta::rpc::router::RegionRoute; -use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; -use common_telemetry::error; -use snafu::ResultExt; -use table::requests::AlterTableRequest; - -use crate::error::{ - RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, UnsupportedSnafu, - WaitProcedureSnafu, -}; - -pub type DdlManagerRef = Arc; - -pub struct DdlManager { - procedure_manager: ProcedureManagerRef, - datanode_clients: Arc, - - pub(crate) cache_invalidator: CacheInvalidatorRef, - pub(crate) table_metadata_manager: TableMetadataManagerRef, -} - -impl DdlManager { - pub(crate) fn new( - procedure_manager: ProcedureManagerRef, - datanode_clients: Arc, - cache_invalidator: CacheInvalidatorRef, - table_metadata_manager: TableMetadataManagerRef, - ) -> Self { - Self { - procedure_manager, - datanode_clients, - cache_invalidator, - table_metadata_manager, - } - } - - pub(crate) fn create_context(&self) -> DdlContext { - DdlContext { - datanode_manager: self.datanode_clients.clone(), - cache_invalidator: self.cache_invalidator.clone(), - table_metadata_manager: self.table_metadata_manager.clone(), - } - } - - pub(crate) fn try_start(&self) -> Result<()> { - let context = self.create_context(); - - self.procedure_manager - .register_loader( - CreateTableProcedure::TYPE_NAME, - Box::new(move |json| { - let context = context.clone(); - CreateTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) - }), - ) - .context(RegisterProcedureLoaderSnafu { - type_name: CreateTableProcedure::TYPE_NAME, - })?; - - let context = self.create_context(); - - self.procedure_manager - .register_loader( - DropTableProcedure::TYPE_NAME, - Box::new(move |json| { - let context = context.clone(); - DropTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) - }), - ) - .context(RegisterProcedureLoaderSnafu { - type_name: DropTableProcedure::TYPE_NAME, - })?; - - let context = self.create_context(); - - self.procedure_manager - .register_loader( - AlterTableProcedure::TYPE_NAME, - Box::new(move |json| { - let context = context.clone(); - AlterTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) - }), - ) - .context(RegisterProcedureLoaderSnafu { - type_name: AlterTableProcedure::TYPE_NAME, - }) - } - - pub async fn submit_alter_table_task( - &self, - cluster_id: u64, - alter_table_task: AlterTableTask, - alter_table_request: AlterTableRequest, - table_info_value: TableInfoValue, - ) -> Result { - let context = self.create_context(); - - let procedure = AlterTableProcedure::new( - cluster_id, - alter_table_task, - alter_table_request, - table_info_value, - context, - ); - - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - - self.submit_procedure(procedure_with_id).await - } - - pub async fn submit_create_table_task( - &self, - cluster_id: u64, - create_table_task: CreateTableTask, - region_routes: Vec, - ) -> Result { - let context = self.create_context(); - - let procedure = - CreateTableProcedure::new(cluster_id, create_table_task, region_routes, context); - - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - - self.submit_procedure(procedure_with_id).await - } - - pub async fn submit_drop_table_task( - &self, - cluster_id: u64, - drop_table_task: DropTableTask, - table_info_value: TableInfoValue, - table_route_value: TableRouteValue, - ) -> Result { - let context = self.create_context(); - - let procedure = DropTableProcedure::new( - cluster_id, - drop_table_task, - table_route_value, - table_info_value, - context, - ); - - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - - self.submit_procedure(procedure_with_id).await - } - - pub async fn submit_truncate_table_task( - &self, - cluster_id: u64, - truncate_table_task: TruncateTableTask, - region_routes: Vec, - ) -> Result { - error!("truncate table procedure is not supported, cluster_id = {}, truncate_table_task = {:?}, region_routes = {:?}", - cluster_id, truncate_table_task, region_routes); - - UnsupportedSnafu { - operation: "TRUNCATE TABLE", - } - .fail() - } - - async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result { - let procedure_id = procedure_with_id.id; - - let mut watcher = self - .procedure_manager - .submit(procedure_with_id) - .await - .context(SubmitProcedureSnafu)?; - - watcher::wait(&mut watcher) - .await - .context(WaitProcedureSnafu)?; - - Ok(procedure_id) - } -} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index b5d2f34b64..9a0a1fb2ea 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -26,6 +26,12 @@ use crate::pubsub::Message; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to submit ddl task: {}", source))] + SubmitDdlTask { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to invalidate table cache: {}", source))] InvalidateTableCache { location: Location, @@ -58,12 +64,6 @@ pub enum Error { source: JoinError, }, - #[snafu(display("Failed to convert grpc expr, source: {}", source))] - ConvertGrpcExpr { - location: Location, - source: common_grpc_expr::error::Error, - }, - #[snafu(display("Failed to execute transaction: {}", msg))] Txn { location: Location, msg: String }, @@ -554,7 +554,6 @@ impl ErrorExt for Error { | Error::StartGrpc { .. } | Error::UpdateTableMetadata { .. } | Error::NoEnoughAvailableDatanode { .. } - | Error::ConvertGrpcExpr { .. } | Error::PublishMessage { .. } | Error::Join { .. } | Error::Unsupported { .. } => StatusCode::Internal, @@ -608,7 +607,7 @@ impl ErrorExt for Error { Error::RegisterProcedureLoader { source, .. } => source.status_code(), Error::OperateRegion { source, .. } => source.status_code(), - + Error::SubmitDdlTask { source, .. } => source.status_code(), Error::TableRouteConversion { source, .. } | Error::ConvertProtoData { source, .. } | Error::TableMetadataManager { source, .. } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index c8ebdefd9f..4a5d86c9d3 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -18,7 +18,6 @@ pub mod bootstrap; mod cache_invalidator; pub mod cluster; -pub mod ddl; pub mod election; pub mod error; mod failure_detector; @@ -36,6 +35,7 @@ pub mod pubsub; pub mod selector; mod sequence; pub mod service; +pub mod table_creator; pub mod table_routes; pub use crate::error::Result; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 0d0bc017cb..1807654fdf 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -22,6 +22,7 @@ use api::v1::meta::Peer; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; +use common_meta::ddl::DdlExecutorRef; use common_meta::key::TableMetadataManagerRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; @@ -33,7 +34,6 @@ use snafu::ResultExt; use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; -use crate::ddl::DdlManagerRef; use crate::election::{Election, LeaderChangeMessage}; use crate::error::{RecoverProcedureSnafu, Result}; use crate::handler::HeartbeatHandlerGroup; @@ -198,7 +198,7 @@ pub struct MetaSrv { procedure_manager: ProcedureManagerRef, metadata_service: MetadataServiceRef, mailbox: MailboxRef, - ddl_manager: DdlManagerRef, + ddl_executor: DdlExecutorRef, table_metadata_manager: TableMetadataManagerRef, greptimedb_telemetry_task: Arc, pubsub: Option<(PublishRef, SubscribeManagerRef)>, @@ -349,8 +349,8 @@ impl MetaSrv { &self.mailbox } - pub fn ddl_manager(&self) -> &DdlManagerRef { - &self.ddl_manager + pub fn ddl_executor(&self) -> &DdlExecutorRef { + &self.ddl_executor } pub fn procedure_manager(&self) -> &ProcedureManagerRef { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 146f8f4c31..92b92356d6 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -18,13 +18,13 @@ use std::time::Duration; use client::client_manager::DatanodeClients; use common_grpc::channel_manager::ChannelConfig; +use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; -use crate::ddl::{DdlManager, DdlManagerRef}; use crate::error::Result; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::check_leader_handler::CheckLeaderHandler; @@ -49,11 +49,12 @@ use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::state_store::MetaStateStore; use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::lease_based::LeaseBasedSelector; -use crate::sequence::Sequence; +use crate::sequence::{Sequence, SequenceRef}; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore}; use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef}; use crate::service::store::memory::MemStore; +use crate::table_creator::MetaSrvTableCreator; // TODO(fys): try use derive_builder macro pub struct MetaSrvBuilder { @@ -178,12 +179,23 @@ impl MetaSrvBuilder { .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(table_metadata_manager))); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); let table_metadata_manager = build_table_metadata_manager(&kv_store); + let ctx = SelectorContext { + datanode_lease_secs: options.datanode_lease_secs, + server_addr: options.server_addr.clone(), + kv_store: kv_store.clone(), + meta_peer_client: meta_peer_client.clone(), + catalog: None, + schema: None, + table: None, + }; let ddl_manager = build_ddl_manager( &options, datanode_clients, &procedure_manager, &mailbox, &table_metadata_manager, + (&selector, &ctx), + &table_id_sequence, ); let _ = ddl_manager.try_start(); @@ -266,7 +278,7 @@ impl MetaSrvBuilder { procedure_manager, metadata_service, mailbox, - ddl_manager, + ddl_executor: ddl_manager, table_metadata_manager, greptimedb_telemetry_task: get_greptimedb_telemetry_task( Some(metasrv_home), @@ -329,6 +341,8 @@ fn build_ddl_manager( procedure_manager: &ProcedureManagerRef, mailbox: &MailboxRef, table_metadata_manager: &TableMetadataManagerRef, + (selector, selector_ctx): (&SelectorRef, &SelectorContext), + table_id_sequence: &SequenceRef, ) -> DdlManagerRef { let datanode_clients = datanode_clients.unwrap_or_else(|| { let datanode_client_channel_config = ChannelConfig::new() @@ -347,12 +361,19 @@ fn build_ddl_manager( server_addr: options.server_addr.clone(), }, )); - // TODO(weny): considers to modify the default config of procedure manager + + let table_creator = Arc::new(MetaSrvTableCreator::new( + selector_ctx.clone(), + selector.clone(), + table_id_sequence.clone(), + )); + Arc::new(DdlManager::new( procedure_manager.clone(), datanode_clients, cache_invalidator, table_metadata_manager.clone(), + table_creator, )) } diff --git a/src/meta-srv/src/service/ddl.rs b/src/meta-srv/src/service/ddl.rs index 18c1fd4f10..fd5afbe979 100644 --- a/src/meta-srv/src/service/ddl.rs +++ b/src/meta-srv/src/service/ddl.rs @@ -13,326 +13,45 @@ // limitations under the License. use api::v1::meta::{ - ddl_task_server, Partition, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TableId, + ddl_task_server, SubmitDdlTaskRequest as PbSubmitDdlTaskRequest, + SubmitDdlTaskResponse as PbSubmitDdlTaskResponse, }; -use common_grpc_expr::alter_expr_to_request; -use common_meta::rpc::ddl::{ - AlterTableTask, CreateTableTask, DdlTask, DropTableTask, TruncateTableTask, -}; -use common_meta::rpc::router::{Region, RegionRoute}; -use common_meta::table_name::TableName; -use common_telemetry::{info, warn}; -use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::{RegionId, MAX_REGION_SEQ}; -use table::metadata::RawTableInfo; +use common_meta::ddl::ExecutorContext; +use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest}; +use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response}; use super::GrpcResult; -use crate::ddl::DdlManagerRef; -use crate::error::{self, Result, TableMetadataManagerSnafu, TooManyPartitionsSnafu}; -use crate::metasrv::{MetaSrv, SelectorContext, SelectorRef}; -use crate::sequence::SequenceRef; +use crate::error::{self}; +use crate::metasrv::MetaSrv; #[async_trait::async_trait] impl ddl_task_server::DdlTask for MetaSrv { async fn submit_ddl_task( &self, - request: Request, - ) -> GrpcResult { - let SubmitDdlTaskRequest { header, task, .. } = request.into_inner(); + request: Request, + ) -> GrpcResult { + let PbSubmitDdlTaskRequest { header, task, .. } = request.into_inner(); let header = header.context(error::MissingRequestHeaderSnafu)?; + let cluster_id = header.cluster_id; let task: DdlTask = task .context(error::MissingRequiredParameterSnafu { param: "task" })? .try_into() .context(error::ConvertProtoDataSnafu)?; - let ctx = SelectorContext { - datanode_lease_secs: self.options().datanode_lease_secs, - server_addr: self.options().server_addr.clone(), - kv_store: self.kv_store().clone(), - meta_peer_client: self.meta_peer_client().clone(), - catalog: None, - schema: None, - table: None, - }; - - let resp = match task { - DdlTask::CreateTable(create_table_task) => { - handle_create_table_task( - header.cluster_id, - create_table_task, - ctx, - self.selector().clone(), - self.table_id_sequence().clone(), - self.ddl_manager().clone(), - ) - .await? - } - DdlTask::DropTable(drop_table_task) => { - handle_drop_table_task( - header.cluster_id, - drop_table_task, - self.ddl_manager().clone(), - ) - .await? - } - DdlTask::AlterTable(alter_table_task) => { - handle_alter_table_task( - header.cluster_id, - alter_table_task, - self.ddl_manager().clone(), - ) - .await? - } - DdlTask::TruncateTable(truncate_table_task) => { - handle_truncate_table_task( - header.cluster_id, - truncate_table_task, - self.ddl_manager().clone(), - ) - .await? - } - }; + let resp = self + .ddl_executor() + .submit_ddl_task( + &ExecutorContext { + cluster_id: Some(cluster_id), + }, + SubmitDdlTaskRequest { task }, + ) + .await + .context(error::SubmitDdlTaskSnafu)? + .into(); Ok(Response::new(resp)) } } - -async fn handle_create_table_task( - cluster_id: u64, - mut create_table_task: CreateTableTask, - ctx: SelectorContext, - selector: SelectorRef, - table_id_sequence: SequenceRef, - ddl_manager: DdlManagerRef, -) -> Result { - let table_name = create_table_task.table_name(); - - let ctx = SelectorContext { - datanode_lease_secs: ctx.datanode_lease_secs, - server_addr: ctx.server_addr, - kv_store: ctx.kv_store, - meta_peer_client: ctx.meta_peer_client, - catalog: Some(table_name.catalog_name.clone()), - schema: Some(table_name.schema_name.clone()), - table: Some(table_name.table_name.clone()), - }; - - let partitions = create_table_task - .partitions - .clone() - .into_iter() - .map(Into::into) - .collect(); - - let region_routes = handle_create_region_routes( - cluster_id, - table_name, - partitions, - &mut create_table_task.table_info, - ctx, - selector, - table_id_sequence, - ) - .await?; - - let table_id = create_table_task.table_info.ident.table_id; - - // TODO(weny): refactor the table route. - let id = ddl_manager - .submit_create_table_task(cluster_id, create_table_task, region_routes) - .await?; - - info!("Table: {table_id} is created via procedure_id {id:?}"); - - Ok(SubmitDdlTaskResponse { - key: id.to_string().into(), - table_id: Some(TableId { id: table_id }), - ..Default::default() - }) -} - -/// pre-calculates create table task's metadata. -async fn handle_create_region_routes( - cluster_id: u64, - table_name: TableName, - partitions: Vec, - table_info: &mut RawTableInfo, - ctx: SelectorContext, - selector: SelectorRef, - table_id_sequence: SequenceRef, -) -> Result> { - let mut peers = selector.select(cluster_id, &ctx).await?; - - if peers.len() < partitions.len() { - warn!("Create table failed due to no enough available datanodes, table: {table_name:?}, partition number: {}, datanode number: {}", partitions.len(), peers.len()); - return error::NoEnoughAvailableDatanodeSnafu { - expected: partitions.len(), - available: peers.len(), - } - .fail(); - } - - // We don't need to keep all peers, just truncate it to the number of partitions. - // If the peers are not enough, some peers will be used for multiple partitions. - peers.truncate(partitions.len()); - - let table_id = table_id_sequence.next().await? as u32; - table_info.ident.table_id = table_id; - - ensure!( - partitions.len() <= MAX_REGION_SEQ as usize, - TooManyPartitionsSnafu - ); - - let region_routes = partitions - .into_iter() - .enumerate() - .map(|(i, partition)| { - let region = Region { - id: RegionId::new(table_id, i as u32), - partition: Some(partition.into()), - ..Default::default() - }; - let peer = peers[i % peers.len()].clone(); - RegionRoute { - region, - leader_peer: Some(peer.into()), - follower_peers: vec![], // follower_peers is not supported at the moment - } - }) - .collect::>(); - - Ok(region_routes) -} - -async fn handle_drop_table_task( - cluster_id: u64, - drop_table_task: DropTableTask, - ddl_manager: DdlManagerRef, -) -> Result { - let table_id = drop_table_task.table_id; - let table_metadata_manager = &ddl_manager.table_metadata_manager; - let table_ref = drop_table_task.table_ref(); - - let (table_info_value, table_route_value) = table_metadata_manager - .get_full_table_info(table_id) - .await - .context(error::TableMetadataManagerSnafu)?; - - let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu { - table_name: table_ref.to_string(), - })?; - - let table_route_value = table_route_value.with_context(|| error::TableRouteNotFoundSnafu { - table_name: table_ref.to_string(), - })?; - - let id = ddl_manager - .submit_drop_table_task( - cluster_id, - drop_table_task, - table_info_value, - table_route_value, - ) - .await?; - - info!("Table: {table_id} is dropped via procedure_id {id:?}"); - - Ok(SubmitDdlTaskResponse { - key: id.to_string().into(), - ..Default::default() - }) -} - -async fn handle_alter_table_task( - cluster_id: u64, - mut alter_table_task: AlterTableTask, - ddl_manager: DdlManagerRef, -) -> Result { - let table_id = alter_table_task - .alter_table - .table_id - .as_ref() - .context(error::UnexpectedSnafu { - violated: "expected table id ", - })? - .id; - - let mut alter_table_request = - alter_expr_to_request(table_id, alter_table_task.alter_table.clone()) - .context(error::ConvertGrpcExprSnafu)?; - - let table_ref = alter_table_task.table_ref(); - - let table_info_value = ddl_manager - .table_metadata_manager - .table_info_manager() - .get(table_id) - .await - .context(TableMetadataManagerSnafu)? - .with_context(|| error::TableInfoNotFoundSnafu { - table_name: table_ref.to_string(), - })?; - - let table_info = &table_info_value.table_info; - - // Sets alter_table's table_version - alter_table_task.alter_table.table_version = table_info.ident.version; - alter_table_request.table_version = Some(table_info.ident.version); - - let id = ddl_manager - .submit_alter_table_task( - cluster_id, - alter_table_task, - alter_table_request, - table_info_value, - ) - .await?; - - info!("Table: {table_id} is altered via procedure_id {id:?}"); - - Ok(SubmitDdlTaskResponse { - key: id.to_string().into(), - ..Default::default() - }) -} - -async fn handle_truncate_table_task( - cluster_id: u64, - truncate_table_task: TruncateTableTask, - ddl_manager: DdlManagerRef, -) -> Result { - let truncate_table = &truncate_table_task.truncate_table; - let table_id = truncate_table - .table_id - .as_ref() - .context(error::UnexpectedSnafu { - violated: "expected table id ", - })? - .id; - - let table_ref = truncate_table_task.table_ref(); - - let table_route_value = ddl_manager - .table_metadata_manager - .table_route_manager() - .get(table_id) - .await - .context(TableMetadataManagerSnafu)? - .with_context(|| error::TableRouteNotFoundSnafu { - table_name: table_ref.to_string(), - })?; - - let table_route = table_route_value.region_routes; - - let id = ddl_manager - .submit_truncate_table_task(cluster_id, truncate_table_task, table_route) - .await?; - - Ok(SubmitDdlTaskResponse { - key: id.to_string().into(), - ..Default::default() - }) -} diff --git a/src/meta-srv/src/table_creator.rs b/src/meta-srv/src/table_creator.rs new file mode 100644 index 0000000000..c54afe4b82 --- /dev/null +++ b/src/meta-srv/src/table_creator.rs @@ -0,0 +1,123 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::Partition; +use common_catalog::format_full_table_name; +use common_error::ext::BoxedError; +use common_meta::ddl::{TableCreator, TableCreatorContext}; +use common_meta::error::{self as meta_error, Result as MetaResult}; +use common_meta::rpc::router::{Region, RegionRoute}; +use common_telemetry::warn; +use snafu::{ensure, ResultExt}; +use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ}; +use table::metadata::RawTableInfo; + +use crate::error::{self, Result, TooManyPartitionsSnafu}; +use crate::metasrv::{SelectorContext, SelectorRef}; +use crate::sequence::SequenceRef; + +pub struct MetaSrvTableCreator { + ctx: SelectorContext, + selector: SelectorRef, + table_id_sequence: SequenceRef, +} + +impl MetaSrvTableCreator { + pub fn new( + ctx: SelectorContext, + selector: SelectorRef, + table_id_sequence: SequenceRef, + ) -> Self { + Self { + ctx, + selector, + table_id_sequence, + } + } +} + +#[async_trait::async_trait] +impl TableCreator for MetaSrvTableCreator { + async fn create( + &self, + ctx: &TableCreatorContext, + raw_table_info: &mut RawTableInfo, + partitions: &[Partition], + ) -> MetaResult<(TableId, Vec)> { + handle_create_region_routes( + ctx.cluster_id, + raw_table_info, + partitions, + &self.ctx, + &self.selector, + &self.table_id_sequence, + ) + .await + .map_err(BoxedError::new) + .context(meta_error::MetaSrvSnafu) + } +} + +/// pre-allocates create table's table id and region routes. +async fn handle_create_region_routes( + cluster_id: u64, + table_info: &mut RawTableInfo, + partitions: &[Partition], + ctx: &SelectorContext, + selector: &SelectorRef, + table_id_sequence: &SequenceRef, +) -> Result<(TableId, Vec)> { + let mut peers = selector.select(cluster_id, ctx).await?; + + if peers.len() < partitions.len() { + warn!("Create table failed due to no enough available datanodes, table: {}, partition number: {}, datanode number: {}", format_full_table_name(&table_info.catalog_name,&table_info.schema_name,&table_info.name), partitions.len(), peers.len()); + return error::NoEnoughAvailableDatanodeSnafu { + expected: partitions.len(), + available: peers.len(), + } + .fail(); + } + + // We don't need to keep all peers, just truncate it to the number of partitions. + // If the peers are not enough, some peers will be used for multiple partitions. + peers.truncate(partitions.len()); + + let table_id = table_id_sequence.next().await? as u32; + table_info.ident.table_id = table_id; + + ensure!( + partitions.len() <= MAX_REGION_SEQ as usize, + TooManyPartitionsSnafu + ); + + let region_routes = partitions + .iter() + .enumerate() + .map(|(i, partition)| { + let region = Region { + id: RegionId::new(table_id, i as u32), + partition: Some(partition.clone().into()), + ..Default::default() + }; + let peer = peers[i % peers.len()].clone(); + RegionRoute { + region, + leader_peer: Some(peer.into()), + follower_peers: vec![], // follower_peers is not supported at the moment + } + }) + .collect::>(); + + Ok((table_id, region_routes)) +}