From ee16262b45a35d185a1a98093eaa3f15897fed59 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 4 Jul 2023 23:24:43 +0900 Subject: [PATCH] feat: add create table procedure (#1845) * feat: add create table procedure * feat: change table_info type from vec u8 to RawTableInfo * feat: return create table status * fix: fix uncaught error * refactor: use a notifier to respond to callers * chore: apply suggestions from CR * chore: apply suggestions from CR * chore: add comment * chore: apply suggestions from CR * refacotr: make CreateMetadata step after DatanodeCreateTable step --- Cargo.lock | 4 +- Cargo.toml | 2 +- src/common/meta/Cargo.toml | 1 + src/common/meta/src/rpc.rs | 1 + src/common/meta/src/rpc/ddl.rs | 217 ++++++++++++++ src/common/meta/src/table_name.rs | 9 + src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/ddl.rs | 110 +++++++ src/meta-srv/src/error.rs | 65 +++- src/meta-srv/src/lib.rs | 1 + src/meta-srv/src/metasrv.rs | 6 + src/meta-srv/src/metasrv/builder.rs | 12 + src/meta-srv/src/procedure.rs | 1 + src/meta-srv/src/procedure/create_table.rs | 330 +++++++++++++++++++++ src/meta-srv/src/service.rs | 1 + src/meta-srv/src/service/ddl.rs | 187 ++++++++++++ src/meta-srv/src/service/router.rs | 6 +- src/mito/src/error.rs | 3 +- src/table/src/test_util.rs | 1 + src/table/src/test_util/table_info.rs | 50 ++++ 20 files changed, 999 insertions(+), 9 deletions(-) create mode 100644 src/common/meta/src/rpc/ddl.rs create mode 100644 src/meta-srv/src/ddl.rs create mode 100644 src/meta-srv/src/procedure/create_table.rs create mode 100644 src/meta-srv/src/service/ddl.rs create mode 100644 src/table/src/test_util/table_info.rs diff --git a/Cargo.lock b/Cargo.lock index 99f7cf6fb8..2c9f2c024e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1813,6 +1813,7 @@ dependencies = [ "common-time", "datatypes", "futures", + "prost", "serde", "serde_json", "snafu", @@ -4108,7 +4109,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/WenyXu/greptime-proto.git?rev=aab7d9a35900f995f9328c8588781e4d75253cba#aab7d9a35900f995f9328c8588781e4d75253cba" +source = "git+https://github.com/WenyXu/greptime-proto.git?rev=1eda4691a5d2c8ffc463d48ca2317905ba7e4b2d#1eda4691a5d2c8ffc463d48ca2317905ba7e4b2d" dependencies = [ "prost", "serde", @@ -5169,6 +5170,7 @@ dependencies = [ "async-trait", "catalog", "chrono", + "client", "common-base", "common-catalog", "common-error", diff --git a/Cargo.toml b/Cargo.toml index 415cc378d4..2cb1a1f048 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,7 +73,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" } futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/WenyXu/greptime-proto.git", rev = "aab7d9a35900f995f9328c8588781e4d75253cba" } +greptime-proto = { git = "https://github.com/WenyXu/greptime-proto.git", rev = "1eda4691a5d2c8ffc463d48ca2317905ba7e4b2d" } itertools = "0.10" parquet = "40.0" paste = "1.0" diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 26bee6c672..f96c2eb412 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -14,6 +14,7 @@ common-runtime = { path = "../runtime" } common-telemetry = { path = "../telemetry" } common-time = { path = "../time" } futures.workspace = true +prost.workspace = true serde.workspace = true serde_json.workspace = true snafu.workspace = true diff --git a/src/common/meta/src/rpc.rs b/src/common/meta/src/rpc.rs index 8225b9330e..e87528e734 100644 --- a/src/common/meta/src/rpc.rs +++ b/src/common/meta/src/rpc.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod ddl; pub mod lock; pub mod router; pub mod store; diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs new file mode 100644 index 0000000000..83a5abbb1c --- /dev/null +++ b/src/common/meta/src/rpc/ddl.rs @@ -0,0 +1,217 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::result; + +use api::v1::meta::submit_ddl_task_request::Task; +use api::v1::meta::{ + CreateTableTask as PbCreateTableTask, Partition, + SubmitDdlTaskRequest as PbSubmitDdlTaskRequest, + SubmitDdlTaskResponse as PbSubmitDdlTaskResponse, +}; +use api::v1::CreateTableExpr; +use prost::Message; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use table::engine::TableReference; +use table::metadata::{RawTableInfo, TableId}; + +use crate::error::{self, Result}; +use crate::table_name::TableName; + +#[derive(Debug)] +pub enum DdlTask { + CreateTable(CreateTableTask), +} + +impl DdlTask { + pub fn new_create_table( + expr: CreateTableExpr, + partitions: Vec, + table_info: RawTableInfo, + ) -> Self { + DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info)) + } +} + +impl TryFrom for DdlTask { + type Error = error::Error; + fn try_from(task: Task) -> Result { + match task { + Task::CreateTableTask(create_table) => { + Ok(DdlTask::CreateTable(create_table.try_into()?)) + } + } + } +} + +pub struct SubmitDdlTaskRequest { + pub task: DdlTask, +} + +impl TryFrom for PbSubmitDdlTaskRequest { + type Error = error::Error; + + fn try_from(request: SubmitDdlTaskRequest) -> Result { + let task = match request.task { + DdlTask::CreateTable(task) => Task::CreateTableTask(PbCreateTableTask { + table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?, + create_table: Some(task.create_table), + partitions: task.partitions, + }), + }; + Ok(Self { + header: None, + task: Some(task), + }) + } +} + +pub struct SubmitDdlTaskResponse { + pub key: Vec, + pub table_id: TableId, +} + +impl TryFrom for SubmitDdlTaskResponse { + type Error = error::Error; + + fn try_from(resp: PbSubmitDdlTaskResponse) -> Result { + let table_id = resp.table_id.context(error::InvalidProtoMsgSnafu { + err_msg: "expected table_id", + })?; + Ok(Self { + key: resp.key, + table_id: table_id.id, + }) + } +} + +#[derive(Debug, PartialEq)] +pub struct CreateTableTask { + pub create_table: CreateTableExpr, + pub partitions: Vec, + pub table_info: RawTableInfo, +} + +impl TryFrom for CreateTableTask { + type Error = error::Error; + + fn try_from(pb: PbCreateTableTask) -> Result { + let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?; + + Ok(CreateTableTask::new( + pb.create_table.context(error::InvalidProtoMsgSnafu { + err_msg: "expected create table", + })?, + pb.partitions, + table_info, + )) + } +} + +impl CreateTableTask { + pub fn new( + expr: CreateTableExpr, + partitions: Vec, + table_info: RawTableInfo, + ) -> CreateTableTask { + CreateTableTask { + create_table: expr, + partitions, + table_info, + } + } + + pub fn table_name(&self) -> TableName { + let table = &self.create_table; + + TableName { + catalog_name: table.catalog_name.to_string(), + schema_name: table.schema_name.to_string(), + table_name: table.table_name.to_string(), + } + } + + pub fn table_ref(&self) -> TableReference { + let table = &self.create_table; + + TableReference { + catalog: &table.catalog_name, + schema: &table.schema_name, + table: &table.table_name, + } + } +} + +impl Serialize for CreateTableTask { + fn serialize(&self, serializer: S) -> result::Result + where + S: serde::Serializer, + { + let table_info = serde_json::to_vec(&self.table_info) + .map_err(|err| serde::ser::Error::custom(err.to_string()))?; + + let pb = PbCreateTableTask { + create_table: Some(self.create_table.clone()), + partitions: self.partitions.clone(), + table_info, + }; + let buf = pb.encode_to_vec(); + serializer.serialize_bytes(&buf) + } +} + +impl<'de> Deserialize<'de> for CreateTableTask { + fn deserialize(deserializer: D) -> result::Result + where + D: serde::Deserializer<'de>, + { + let buf = Vec::::deserialize(deserializer)?; + let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf) + .map_err(|err| serde::de::Error::custom(err.to_string()))?; + + let expr = CreateTableTask::try_from(expr) + .map_err(|err| serde::de::Error::custom(err.to_string()))?; + + Ok(expr) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::CreateTableExpr; + use datatypes::schema::SchemaBuilder; + use table::metadata::RawTableInfo; + use table::test_util::table_info::test_table_info; + + use super::CreateTableTask; + + #[test] + fn test_basic_ser_de_create_table_task() { + let schema = SchemaBuilder::default().build().unwrap(); + let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema)); + let task = CreateTableTask::new( + CreateTableExpr::default(), + Vec::new(), + RawTableInfo::from(table_info), + ); + + let output = serde_json::to_vec(&task).unwrap(); + + let de = serde_json::from_slice(&output).unwrap(); + assert_eq!(task, de); + } +} diff --git a/src/common/meta/src/table_name.rs b/src/common/meta/src/table_name.rs index 78734466a7..fde0efd5ab 100644 --- a/src/common/meta/src/table_name.rs +++ b/src/common/meta/src/table_name.rs @@ -16,6 +16,7 @@ use std::fmt::{Display, Formatter}; use api::v1::meta::TableName as PbTableName; use serde::{Deserialize, Serialize}; +use table::engine::TableReference; #[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)] pub struct TableName { @@ -46,6 +47,14 @@ impl TableName { table_name: table_name.into(), } } + + pub fn table_ref(&self) -> TableReference<'_> { + TableReference { + catalog: &self.catalog_name, + schema: &self.schema_name, + table: &self.table_name, + } + } } impl From for PbTableName { diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index ca50243ed9..27f8d89103 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -13,6 +13,7 @@ api = { path = "../api" } async-stream.workspace = true async-trait = "0.1" catalog = { path = "../catalog" } +client = { path = "../client" } common-base = { path = "../common/base" } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } diff --git a/src/meta-srv/src/ddl.rs b/src/meta-srv/src/ddl.rs new file mode 100644 index 0000000000..a465328b06 --- /dev/null +++ b/src/meta-srv/src/ddl.rs @@ -0,0 +1,110 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use client::client_manager::DatanodeClients; +use common_meta::rpc::ddl::CreateTableTask; +use common_meta::rpc::router::TableRoute; +use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::procedure::create_table::CreateTableProcedure; +use crate::service::store::kv::KvStoreRef; + +pub type DdlManagerRef = Arc; + +pub struct DdlManager { + procedure_manager: ProcedureManagerRef, + kv_store: KvStoreRef, + datanode_clients: Arc, +} + +// TODO(weny): removes in following PRs. +#[allow(unused)] +#[derive(Clone)] +pub(crate) struct DdlContext { + pub(crate) kv_store: KvStoreRef, + pub(crate) datanode_clients: Arc, +} + +impl DdlManager { + pub(crate) fn new( + procedure_manager: ProcedureManagerRef, + kv_store: KvStoreRef, + datanode_clients: Arc, + ) -> Self { + Self { + procedure_manager, + kv_store, + datanode_clients, + } + } + + pub(crate) fn create_context(&self) -> DdlContext { + DdlContext { + kv_store: self.kv_store.clone(), + datanode_clients: self.datanode_clients.clone(), + } + } + + pub(crate) fn try_start(&self) -> Result<()> { + let context = self.create_context(); + + self.procedure_manager + .register_loader( + CreateTableProcedure::TYPE_NAME, + Box::new(move |json| { + let context = context.clone(); + CreateTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }), + ) + .context(error::RegisterProcedureLoaderSnafu { + type_name: CreateTableProcedure::TYPE_NAME, + }) + } + + pub async fn submit_create_table_task( + &self, + cluster_id: u64, + create_table_task: CreateTableTask, + table_route: TableRoute, + ) -> Result { + let context = self.create_context(); + + let procedure = + CreateTableProcedure::new(cluster_id, create_table_task, table_route, context); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + + async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result { + let procedure_id = procedure_with_id.id; + + let mut watcher = self + .procedure_manager + .submit(procedure_with_id) + .await + .context(error::SubmitProcedureSnafu)?; + + watcher::wait(&mut watcher) + .await + .context(error::WaitProcedureSnafu)?; + + Ok(procedure_id) + } +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 1e5c56f809..4f94d921f7 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -13,14 +13,54 @@ // limitations under the License. use common_error::prelude::*; +use common_meta::peer::Peer; use snafu::Location; use tokio::sync::mpsc::error::SendError; +use tokio::sync::oneshot::error::TryRecvError; use tonic::codegen::http; use tonic::Code; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to execute transaction: {}", msg))] + Txn { location: Location, msg: String }, + + #[snafu(display( + "Unexpected table_id changed, expected: {}, found: {}", + expected, + found, + ))] + TableIdChanged { + location: Location, + expected: u64, + found: u64, + }, + + #[snafu(display("Failed to receive status, source: {}", source,))] + TryReceiveStatus { + location: Location, + source: TryRecvError, + }, + + #[snafu(display( + "Failed to request Datanode, expected: {}, but only {} available", + expected, + available + ))] + NoEnoughAvailableDatanode { + location: Location, + expected: usize, + available: usize, + }, + + #[snafu(display("Failed to request Datanode {}, source: {}", peer, source))] + RequestDatanode { + location: Location, + peer: Peer, + source: client::Error, + }, + #[snafu(display("Failed to send shutdown signal"))] SendShutdownSignal { source: SendError<()> }, @@ -274,6 +314,18 @@ pub enum Error { source: common_procedure::Error, }, + #[snafu(display("Failed to recover procedure, source: {source}"))] + WaitProcedure { + location: Location, + source: common_procedure::Error, + }, + + #[snafu(display("Failed to submit procedure, source: {source}"))] + SubmitProcedure { + location: Location, + source: common_procedure::Error, + }, + #[snafu(display("Schema already exists, name: {schema_name}"))] SchemaAlreadyExists { schema_name: String, @@ -413,7 +465,9 @@ impl ErrorExt for Error { | Error::MailboxReceiver { .. } | Error::RetryLater { .. } | Error::StartGrpc { .. } - | Error::Combine { .. } => StatusCode::Internal, + | Error::Combine { .. } + | Error::NoEnoughAvailableDatanode { .. } + | Error::TryReceiveStatus { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } | Error::MissingRequestHeader { .. } @@ -437,10 +491,15 @@ impl ErrorExt for Error { | Error::InvalidUtf8Value { .. } | Error::UnexpectedInstructionReply { .. } | Error::EtcdTxnOpResponse { .. } - | Error::Unexpected { .. } => StatusCode::Unexpected, + | Error::Unexpected { .. } + | Error::Txn { .. } + | Error::TableIdChanged { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, + Error::RequestDatanode { source, .. } => source.status_code(), Error::InvalidCatalogValue { source, .. } => source.status_code(), - Error::RecoverProcedure { source, .. } => source.status_code(), + Error::RecoverProcedure { source, .. } + | Error::SubmitProcedure { source, .. } + | Error::WaitProcedure { source, .. } => source.status_code(), Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => { source.status_code() } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 01648f3429..94dcd78218 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -17,6 +17,7 @@ pub mod bootstrap; pub mod cluster; +pub mod ddl; pub mod election; pub mod error; mod failure_detector; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 7280c0a751..bb685ff168 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -28,6 +28,7 @@ use snafu::ResultExt; use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; +use crate::ddl::DdlManagerRef; use crate::election::{Election, LeaderChangeMessage}; use crate::error::{RecoverProcedureSnafu, Result}; use crate::handler::HeartbeatHandlerGroup; @@ -135,6 +136,7 @@ pub struct MetaSrv { procedure_manager: ProcedureManagerRef, metadata_service: MetadataServiceRef, mailbox: MailboxRef, + ddl_manager: DdlManagerRef, } impl MetaSrv { @@ -276,6 +278,10 @@ impl MetaSrv { } #[inline] + pub fn ddl_manager(&self) -> &DdlManagerRef { + &self.ddl_manager + } + pub fn procedure_manager(&self) -> &ProcedureManagerRef { &self.procedure_manager } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index e75b8cacbc..38720bf7b5 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -15,9 +15,11 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; +use client::client_manager::DatanodeClients; use common_procedure::local::{LocalManager, ManagerConfig}; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; +use crate::ddl::DdlManager; use crate::error::Result; use crate::handler::mailbox_handler::MailboxHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; @@ -210,6 +212,15 @@ impl MetaSrvBuilder { } }; + // TODO(weny): considers to modify the default config of procedure manager + let ddl_manager = Arc::new(DdlManager::new( + procedure_manager.clone(), + kv_store.clone(), + Arc::new(DatanodeClients::default()), + )); + + let _ = ddl_manager.try_start(); + Ok(MetaSrv { started, options, @@ -225,6 +236,7 @@ impl MetaSrvBuilder { procedure_manager, metadata_service, mailbox, + ddl_manager, }) } } diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index a7c4bc73b2..f819ccc11c 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod create_table; pub mod region_failover; pub(crate) mod state_store; diff --git a/src/meta-srv/src/procedure/create_table.rs b/src/meta-srv/src/procedure/create_table.rs new file mode 100644 index 0000000000..75f9eb1b52 --- /dev/null +++ b/src/meta-srv/src/procedure/create_table.rs @@ -0,0 +1,330 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::TableRouteValue; +use async_trait::async_trait; +use catalog::helper::TableGlobalKey; +use client::Database; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_meta::key::TableRouteKey; +use common_meta::rpc::ddl::CreateTableTask; +use common_meta::rpc::router::TableRoute; +use common_meta::table_name::TableName; +use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, +}; +use futures::future::join_all; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use table::engine::TableReference; + +use crate::ddl::DdlContext; +use crate::error::{self, Result}; +use crate::service::router::create_table_global_value; +use crate::service::store::txn::{Compare, CompareOp, Txn, TxnOp}; +use crate::table_routes::get_table_global_value; + +// TODO(weny): removes in following PRs. +#[allow(unused)] +pub struct CreateTableProcedure { + context: DdlContext, + creator: TableCreator, +} + +// TODO(weny): removes in following PRs. +#[allow(dead_code)] +impl CreateTableProcedure { + pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable"; + + pub(crate) fn new( + cluster_id: u64, + task: CreateTableTask, + table_route: TableRoute, + context: DdlContext, + ) -> Self { + Self { + context, + creator: TableCreator::new(cluster_id, task, table_route), + } + } + + pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + Ok(CreateTableProcedure { + context, + creator: TableCreator { data }, + }) + } + + fn global_table_key(&self) -> TableGlobalKey { + let table_ref = self.creator.data.table_ref(); + + TableGlobalKey { + catalog_name: table_ref.catalog.to_string(), + schema_name: table_ref.schema.to_string(), + table_name: table_ref.table.to_string(), + } + } + + fn table_name(&self) -> TableName { + self.creator.data.task.table_name() + } + + /// Checks whether the table exists. + async fn on_prepare(&mut self) -> Result { + if (get_table_global_value(&self.context.kv_store, &self.global_table_key()).await?) + .is_some() + { + ensure!( + self.creator.data.task.create_table.create_if_not_exists, + error::TableAlreadyExistsSnafu { + table_name: self.creator.data.table_ref().to_string(), + } + ); + + return Ok(Status::Done); + } + + self.creator.data.state = CreateTableState::DatanodeCreateTable; + + Ok(Status::executing(true)) + } + + /// registers the `TableRouteValue`,`TableGlobalValue` + async fn register_metadata(&self) -> Result<()> { + let table_name = self.table_name(); + + let table_id = self.creator.data.table_route.table.id; + + let table_route_key = TableRouteKey::with_table_name(table_id, &table_name.clone().into()) + .key() + .into_bytes(); + + let table_global_key = TableGlobalKey { + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + } + .to_string() + .into_bytes(); + + let (peers, table_route) = self + .creator + .data + .table_route + .clone() + .try_into_raw() + .context(error::ConvertProtoDataSnafu)?; + + let table_route_value = TableRouteValue { + peers, + table_route: Some(table_route), + }; + + let table_global_value = create_table_global_value( + &table_route_value, + self.creator.data.task.table_info.clone(), + )? + .as_bytes() + .context(error::InvalidCatalogValueSnafu)?; + + let txn = Txn::new() + .when(vec![ + Compare::with_not_exist_value(table_route_key.clone(), CompareOp::Equal), + Compare::with_not_exist_value(table_global_key.clone(), CompareOp::Equal), + ]) + .and_then(vec![ + TxnOp::Put(table_route_key, table_route_value.into()), + TxnOp::Put(table_global_key, table_global_value), + ]); + + let resp = self.context.kv_store.txn(txn).await?; + + ensure!( + resp.succeeded, + error::TxnSnafu { + msg: "table_route_key or table_global_key exists" + } + ); + + Ok(()) + } + + async fn on_create_metadata(&mut self) -> Result { + let kv_store = &self.context.kv_store; + let key = &self.global_table_key(); + + match get_table_global_value(kv_store, key).await? { + Some(table_global_value) => { + // The metasrv crashed after metadata was created immediately. + // Recovers table_route from kv. + let table_id = table_global_value.table_id() as u64; + + let expected = self.creator.data.table_route.table.id; + // If there is something like: + // Create table A, Create table A(from another Fe, Somehow, Failed), Renames table A to B, Create table A(Recovered). + // We must ensure the table_id isn't changed. + ensure!( + table_id == expected, + error::TableIdChangedSnafu { + expected, + found: table_id + } + ); + } + None => { + // registers metadata + self.register_metadata().await?; + } + } + + Ok(Status::Done) + } + + async fn on_datanode_create_table(&mut self) -> Result { + let table_route = &self.creator.data.table_route; + + let table_name = self.table_name(); + let clients = self.context.datanode_clients.clone(); + let leaders = table_route.find_leaders(); + let mut joins = Vec::with_capacity(leaders.len()); + + for datanode in leaders { + let client = clients.get_client(&datanode).await; + let client = Database::new(&table_name.catalog_name, &table_name.schema_name, client); + + let regions = table_route.find_leader_regions(&datanode); + let mut create_expr_for_region = self.creator.data.task.create_table.clone(); + create_expr_for_region.region_numbers = regions; + + joins.push(common_runtime::spawn_bg(async move { + if let Err(err) = client + .create(create_expr_for_region) + .await + .context(error::RequestDatanodeSnafu { peer: datanode }) + { + // TODO(weny): add tests for `TableAlreadyExists` + if err.status_code() != StatusCode::TableAlreadyExists { + return Err(err); + } + } + Ok(()) + })); + } + + let _ = join_all(joins) + .await + .into_iter() + .map(|result| { + result.map_err(|err| { + error::RetryLaterSnafu { + reason: format!( + "Failed to execute create table on datanode, source: {}", + err + ), + } + .build() + }) + }) + .collect::>>()?; + + self.creator.data.state = CreateTableState::CreateMetadata; + + Ok(Status::executing(true)) + } +} + +#[async_trait] +impl Procedure for CreateTableProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let error_handler = |e| { + if matches!(e, error::Error::RetryLater { .. }) { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } + }; + match self.creator.data.state { + CreateTableState::Prepare => self.on_prepare().await.map_err(error_handler), + CreateTableState::DatanodeCreateTable => { + self.on_datanode_create_table().await.map_err(error_handler) + } + CreateTableState::CreateMetadata => { + self.on_create_metadata().await.map_err(error_handler) + } + } + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.creator.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let table_ref = &self.creator.data.table_ref(); + let key = common_catalog::format_full_table_name( + table_ref.catalog, + table_ref.schema, + table_ref.table, + ); + + LockKey::single(key) + } +} + +pub struct TableCreator { + data: CreateTableData, +} + +impl TableCreator { + pub fn new(cluster_id: u64, task: CreateTableTask, table_route: TableRoute) -> Self { + Self { + data: CreateTableData { + state: CreateTableState::Prepare, + cluster_id, + task, + table_route, + }, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +enum CreateTableState { + /// Prepares to create the table + Prepare, + /// Datanode creates the table + DatanodeCreateTable, + /// Creates metadata + CreateMetadata, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateTableData { + state: CreateTableState, + task: CreateTableTask, + table_route: TableRoute, + cluster_id: u64, +} + +impl CreateTableData { + fn table_ref(&self) -> TableReference<'_> { + self.task.table_ref() + } +} diff --git a/src/meta-srv/src/service.rs b/src/meta-srv/src/service.rs index 70f95fbd25..0e68db8eae 100644 --- a/src/meta-srv/src/service.rs +++ b/src/meta-srv/src/service.rs @@ -19,6 +19,7 @@ use tonic::{Response, Status}; pub mod admin; pub mod cluster; +pub mod ddl; mod heartbeat; pub mod lock; pub mod mailbox; diff --git a/src/meta-srv/src/service/ddl.rs b/src/meta-srv/src/service/ddl.rs new file mode 100644 index 0000000000..878db48cf7 --- /dev/null +++ b/src/meta-srv/src/service/ddl.rs @@ -0,0 +1,187 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{ + ddl_task_server, Partition, Region, RegionRoute, SubmitDdlTaskRequest, SubmitDdlTaskResponse, + Table, TableRoute, +}; +use api::v1::TableId; +use common_meta::rpc::ddl::{CreateTableTask, DdlTask}; +use common_meta::rpc::router; +use common_meta::table_name::TableName; +use common_telemetry::{info, warn}; +use snafu::{OptionExt, ResultExt}; +use table::metadata::RawTableInfo; +use tonic::{Request, Response}; + +use super::GrpcResult; +use crate::ddl::DdlManagerRef; +use crate::error::{self, Result}; +use crate::metasrv::{MetaSrv, SelectorContext, SelectorRef}; +use crate::sequence::SequenceRef; + +#[async_trait::async_trait] +impl ddl_task_server::DdlTask for MetaSrv { + async fn submit_ddl_task( + &self, + request: Request, + ) -> GrpcResult { + let SubmitDdlTaskRequest { header, task, .. } = request.into_inner(); + + let header = header.context(error::MissingRequestHeaderSnafu)?; + let task: DdlTask = task + .context(error::MissingRequiredParameterSnafu { param: "task" })? + .try_into() + .context(error::ConvertProtoDataSnafu)?; + + let ctx = SelectorContext { + datanode_lease_secs: self.options().datanode_lease_secs, + server_addr: self.options().server_addr.clone(), + kv_store: self.kv_store(), + meta_peer_client: self.meta_peer_client(), + catalog: None, + schema: None, + table: None, + }; + + let resp = match task { + DdlTask::CreateTable(create_table_task) => { + handle_create_table_task( + header.cluster_id, + create_table_task, + ctx, + self.selector().clone(), + self.table_id_sequence().clone(), + self.ddl_manager().clone(), + ) + .await? + } + }; + + Ok(Response::new(resp)) + } +} + +async fn handle_create_table_task( + cluster_id: u64, + mut create_table_task: CreateTableTask, + ctx: SelectorContext, + selector: SelectorRef, + table_id_sequence: SequenceRef, + ddl_manager: DdlManagerRef, +) -> Result { + let table_name = create_table_task.table_name(); + + let ctx = SelectorContext { + datanode_lease_secs: ctx.datanode_lease_secs, + server_addr: ctx.server_addr, + kv_store: ctx.kv_store, + meta_peer_client: ctx.meta_peer_client, + catalog: Some(table_name.catalog_name.clone()), + schema: Some(table_name.schema_name.clone()), + table: Some(table_name.table_name.clone()), + }; + + let partitions = create_table_task + .partitions + .clone() + .into_iter() + .map(Into::into) + .collect(); + + let table_route = handle_create_table_route( + cluster_id, + table_name, + partitions, + &mut create_table_task.table_info, + ctx, + selector, + table_id_sequence, + ) + .await?; + let table_id = table_route.table.id; + + let id = ddl_manager + .submit_create_table_task(cluster_id, create_table_task, table_route) + .await?; + + info!("Table: {table_id} created via procedure_id {id:?}"); + + Ok(SubmitDdlTaskResponse { + key: id.to_string().into(), + table_id: Some(TableId { + id: table_id as u32, + }), + ..Default::default() + }) +} + +/// pre-calculates create table task's metadata. +async fn handle_create_table_route( + cluster_id: u64, + table_name: TableName, + partitions: Vec, + table_info: &mut RawTableInfo, + ctx: SelectorContext, + selector: SelectorRef, + table_id_sequence: SequenceRef, +) -> Result { + let mut peers = selector.select(cluster_id, &ctx).await?; + + if peers.len() < partitions.len() { + warn!("Create table failed due to no enough available datanodes, table: {table_name:?}, partition number: {}, datanode number: {}", partitions.len(), peers.len()); + return error::NoEnoughAvailableDatanodeSnafu { + expected: partitions.len(), + available: peers.len(), + } + .fail(); + } + + // We don't need to keep all peers, just truncate it to the number of partitions. + // If the peers are not enough, some peers will be used for multiple partitions. + peers.truncate(partitions.len()); + + let id = table_id_sequence.next().await?; + table_info.ident.table_id = id as u32; + + let table = Table { + id, + table_name: Some(table_name.into()), + ..Default::default() + }; + + let region_routes = partitions + .into_iter() + .enumerate() + .map(|(i, partition)| { + let region = Region { + id: i as u64, + partition: Some(partition), + ..Default::default() + }; + RegionRoute { + region: Some(region), + leader_peer_index: (i % peers.len()) as u64, + follower_peer_indexes: vec![], // follower_peers is not supported at the moment + } + }) + .collect::>(); + + let table_route = TableRoute { + table: Some(table), + region_routes, + }; + + router::TableRoute::try_from_raw(&peers, table_route).context(error::TableRouteConversionSnafu) +} diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index d5dbddc866..1e107241b7 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -258,7 +258,7 @@ async fn handle_create( }) } -fn create_table_global_value( +pub(crate) fn create_table_global_value( table_route_value: &TableRouteValue, table_info: RawTableInfo, ) -> Result { @@ -349,7 +349,7 @@ async fn handle_delete(req: DeleteRequest, ctx: Context) -> Result, ) -> Result<(Vec, Vec)> { let mut peer_dict = PeerDict::default(); @@ -407,7 +407,7 @@ async fn fetch_tables( Ok(tables) } -fn table_route_key(table_id: u64, t: &TableGlobalKey) -> TableRouteKey<'_> { +pub(crate) fn table_route_key(table_id: u64, t: &TableGlobalKey) -> TableRouteKey<'_> { TableRouteKey { table_id, catalog_name: &t.catalog_name, diff --git a/src/mito/src/error.rs b/src/mito/src/error.rs index 3e6844da55..9bbb609203 100644 --- a/src/mito/src/error.rs +++ b/src/mito/src/error.rs @@ -183,7 +183,6 @@ impl ErrorExt for Error { | BuildTableMeta { .. } | BuildTableInfo { .. } | BuildRegionDescriptor { .. } - | TableExists { .. } | ProjectedColumnNotFound { .. } | InvalidPrimaryKey { .. } | MissingTimestampIndex { .. } @@ -191,6 +190,8 @@ impl ErrorExt for Error { | InvalidRawSchema { .. } | VersionChanged { .. } => StatusCode::InvalidArguments, + TableExists { .. } => StatusCode::TableAlreadyExists, + ConvertRaw { .. } => StatusCode::Unexpected, ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable, diff --git a/src/table/src/test_util.rs b/src/table/src/test_util.rs index 59737db02f..884f176029 100644 --- a/src/table/src/test_util.rs +++ b/src/table/src/test_util.rs @@ -15,6 +15,7 @@ mod empty_table; mod memtable; mod mock_engine; +pub mod table_info; pub use empty_table::EmptyTable; pub use memtable::MemTable; diff --git a/src/table/src/test_util/table_info.rs b/src/table/src/test_util/table_info.rs new file mode 100644 index 0000000000..ae061ccb02 --- /dev/null +++ b/src/table/src/test_util/table_info.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datatypes::schema::SchemaRef; + +use crate::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion}; + +pub fn test_table_info( + table_id: u32, + table_name: &str, + schema_name: &str, + catalog_name: &str, + schema: SchemaRef, +) -> TableInfo { + let meta = TableMetaBuilder::default() + .schema(schema) + .primary_key_indices(vec![]) + .value_indices(vec![]) + .engine("mito".to_string()) + .next_column_id(0) + .engine_options(Default::default()) + .options(Default::default()) + .created_on(Default::default()) + .region_numbers(vec![1]) + .build() + .unwrap(); + + TableInfoBuilder::default() + .table_id(table_id) + .table_version(0 as TableVersion) + .name(table_name.to_string()) + .schema_name(schema_name.to_string()) + .catalog_name(catalog_name.to_string()) + .desc(None) + .table_type(TableType::Base) + .meta(meta) + .build() + .unwrap() +}