feat: add create table procedure (#1845)

* feat: add create table procedure

* feat: change table_info type from vec u8 to RawTableInfo

* feat: return create table status

* fix: fix uncaught error

* refactor: use a notifier to respond to callers

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: add comment

* chore: apply suggestions from CR

* refacotr: make CreateMetadata step after DatanodeCreateTable step
This commit is contained in:
Weny Xu
2023-07-04 23:24:43 +09:00
committed by GitHub
parent f37b394f1a
commit ee16262b45
20 changed files with 999 additions and 9 deletions

4
Cargo.lock generated
View File

@@ -1813,6 +1813,7 @@ dependencies = [
"common-time",
"datatypes",
"futures",
"prost",
"serde",
"serde_json",
"snafu",
@@ -4108,7 +4109,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/WenyXu/greptime-proto.git?rev=aab7d9a35900f995f9328c8588781e4d75253cba#aab7d9a35900f995f9328c8588781e4d75253cba"
source = "git+https://github.com/WenyXu/greptime-proto.git?rev=1eda4691a5d2c8ffc463d48ca2317905ba7e4b2d#1eda4691a5d2c8ffc463d48ca2317905ba7e4b2d"
dependencies = [
"prost",
"serde",
@@ -5169,6 +5170,7 @@ dependencies = [
"async-trait",
"catalog",
"chrono",
"client",
"common-base",
"common-catalog",
"common-error",

View File

@@ -73,7 +73,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/WenyXu/greptime-proto.git", rev = "aab7d9a35900f995f9328c8588781e4d75253cba" }
greptime-proto = { git = "https://github.com/WenyXu/greptime-proto.git", rev = "1eda4691a5d2c8ffc463d48ca2317905ba7e4b2d" }
itertools = "0.10"
parquet = "40.0"
paste = "1.0"

View File

@@ -14,6 +14,7 @@ common-runtime = { path = "../runtime" }
common-telemetry = { path = "../telemetry" }
common-time = { path = "../time" }
futures.workspace = true
prost.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod ddl;
pub mod lock;
pub mod router;
pub mod store;

View File

@@ -0,0 +1,217 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::result;
use api::v1::meta::submit_ddl_task_request::Task;
use api::v1::meta::{
CreateTableTask as PbCreateTableTask, Partition,
SubmitDdlTaskRequest as PbSubmitDdlTaskRequest,
SubmitDdlTaskResponse as PbSubmitDdlTaskResponse,
};
use api::v1::CreateTableExpr;
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
use crate::error::{self, Result};
use crate::table_name::TableName;
#[derive(Debug)]
pub enum DdlTask {
CreateTable(CreateTableTask),
}
impl DdlTask {
pub fn new_create_table(
expr: CreateTableExpr,
partitions: Vec<Partition>,
table_info: RawTableInfo,
) -> Self {
DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
}
}
impl TryFrom<Task> for DdlTask {
type Error = error::Error;
fn try_from(task: Task) -> Result<Self> {
match task {
Task::CreateTableTask(create_table) => {
Ok(DdlTask::CreateTable(create_table.try_into()?))
}
}
}
}
pub struct SubmitDdlTaskRequest {
pub task: DdlTask,
}
impl TryFrom<SubmitDdlTaskRequest> for PbSubmitDdlTaskRequest {
type Error = error::Error;
fn try_from(request: SubmitDdlTaskRequest) -> Result<Self> {
let task = match request.task {
DdlTask::CreateTable(task) => Task::CreateTableTask(PbCreateTableTask {
table_info: serde_json::to_vec(&task.table_info).context(error::SerdeJsonSnafu)?,
create_table: Some(task.create_table),
partitions: task.partitions,
}),
};
Ok(Self {
header: None,
task: Some(task),
})
}
}
pub struct SubmitDdlTaskResponse {
pub key: Vec<u8>,
pub table_id: TableId,
}
impl TryFrom<PbSubmitDdlTaskResponse> for SubmitDdlTaskResponse {
type Error = error::Error;
fn try_from(resp: PbSubmitDdlTaskResponse) -> Result<Self> {
let table_id = resp.table_id.context(error::InvalidProtoMsgSnafu {
err_msg: "expected table_id",
})?;
Ok(Self {
key: resp.key,
table_id: table_id.id,
})
}
}
#[derive(Debug, PartialEq)]
pub struct CreateTableTask {
pub create_table: CreateTableExpr,
pub partitions: Vec<Partition>,
pub table_info: RawTableInfo,
}
impl TryFrom<PbCreateTableTask> for CreateTableTask {
type Error = error::Error;
fn try_from(pb: PbCreateTableTask) -> Result<Self> {
let table_info = serde_json::from_slice(&pb.table_info).context(error::SerdeJsonSnafu)?;
Ok(CreateTableTask::new(
pb.create_table.context(error::InvalidProtoMsgSnafu {
err_msg: "expected create table",
})?,
pb.partitions,
table_info,
))
}
}
impl CreateTableTask {
pub fn new(
expr: CreateTableExpr,
partitions: Vec<Partition>,
table_info: RawTableInfo,
) -> CreateTableTask {
CreateTableTask {
create_table: expr,
partitions,
table_info,
}
}
pub fn table_name(&self) -> TableName {
let table = &self.create_table;
TableName {
catalog_name: table.catalog_name.to_string(),
schema_name: table.schema_name.to_string(),
table_name: table.table_name.to_string(),
}
}
pub fn table_ref(&self) -> TableReference {
let table = &self.create_table;
TableReference {
catalog: &table.catalog_name,
schema: &table.schema_name,
table: &table.table_name,
}
}
}
impl Serialize for CreateTableTask {
fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let table_info = serde_json::to_vec(&self.table_info)
.map_err(|err| serde::ser::Error::custom(err.to_string()))?;
let pb = PbCreateTableTask {
create_table: Some(self.create_table.clone()),
partitions: self.partitions.clone(),
table_info,
};
let buf = pb.encode_to_vec();
serializer.serialize_bytes(&buf)
}
}
impl<'de> Deserialize<'de> for CreateTableTask {
fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let buf = Vec::<u8>::deserialize(deserializer)?;
let expr: PbCreateTableTask = PbCreateTableTask::decode(&*buf)
.map_err(|err| serde::de::Error::custom(err.to_string()))?;
let expr = CreateTableTask::try_from(expr)
.map_err(|err| serde::de::Error::custom(err.to_string()))?;
Ok(expr)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::CreateTableExpr;
use datatypes::schema::SchemaBuilder;
use table::metadata::RawTableInfo;
use table::test_util::table_info::test_table_info;
use super::CreateTableTask;
#[test]
fn test_basic_ser_de_create_table_task() {
let schema = SchemaBuilder::default().build().unwrap();
let table_info = test_table_info(1025, "foo", "bar", "baz", Arc::new(schema));
let task = CreateTableTask::new(
CreateTableExpr::default(),
Vec::new(),
RawTableInfo::from(table_info),
);
let output = serde_json::to_vec(&task).unwrap();
let de = serde_json::from_slice(&output).unwrap();
assert_eq!(task, de);
}
}

View File

@@ -16,6 +16,7 @@ use std::fmt::{Display, Formatter};
use api::v1::meta::TableName as PbTableName;
use serde::{Deserialize, Serialize};
use table::engine::TableReference;
#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct TableName {
@@ -46,6 +47,14 @@ impl TableName {
table_name: table_name.into(),
}
}
pub fn table_ref(&self) -> TableReference<'_> {
TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: &self.table_name,
}
}
}
impl From<TableName> for PbTableName {

View File

@@ -13,6 +13,7 @@ api = { path = "../api" }
async-stream.workspace = true
async-trait = "0.1"
catalog = { path = "../catalog" }
client = { path = "../client" }
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }

110
src/meta-srv/src/ddl.rs Normal file
View File

@@ -0,0 +1,110 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use client::client_manager::DatanodeClients;
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::TableRoute;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::procedure::create_table::CreateTableProcedure;
use crate::service::store::kv::KvStoreRef;
pub type DdlManagerRef = Arc<DdlManager>;
pub struct DdlManager {
procedure_manager: ProcedureManagerRef,
kv_store: KvStoreRef,
datanode_clients: Arc<DatanodeClients>,
}
// TODO(weny): removes in following PRs.
#[allow(unused)]
#[derive(Clone)]
pub(crate) struct DdlContext {
pub(crate) kv_store: KvStoreRef,
pub(crate) datanode_clients: Arc<DatanodeClients>,
}
impl DdlManager {
pub(crate) fn new(
procedure_manager: ProcedureManagerRef,
kv_store: KvStoreRef,
datanode_clients: Arc<DatanodeClients>,
) -> Self {
Self {
procedure_manager,
kv_store,
datanode_clients,
}
}
pub(crate) fn create_context(&self) -> DdlContext {
DdlContext {
kv_store: self.kv_store.clone(),
datanode_clients: self.datanode_clients.clone(),
}
}
pub(crate) fn try_start(&self) -> Result<()> {
let context = self.create_context();
self.procedure_manager
.register_loader(
CreateTableProcedure::TYPE_NAME,
Box::new(move |json| {
let context = context.clone();
CreateTableProcedure::from_json(json, context).map(|p| Box::new(p) as _)
}),
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: CreateTableProcedure::TYPE_NAME,
})
}
pub async fn submit_create_table_task(
&self,
cluster_id: u64,
create_table_task: CreateTableTask,
table_route: TableRoute,
) -> Result<ProcedureId> {
let context = self.create_context();
let procedure =
CreateTableProcedure::new(cluster_id, create_table_task, table_route, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
let procedure_id = procedure_with_id.id;
let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu)?;
watcher::wait(&mut watcher)
.await
.context(error::WaitProcedureSnafu)?;
Ok(procedure_id)
}
}

View File

@@ -13,14 +13,54 @@
// limitations under the License.
use common_error::prelude::*;
use common_meta::peer::Peer;
use snafu::Location;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::oneshot::error::TryRecvError;
use tonic::codegen::http;
use tonic::Code;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to execute transaction: {}", msg))]
Txn { location: Location, msg: String },
#[snafu(display(
"Unexpected table_id changed, expected: {}, found: {}",
expected,
found,
))]
TableIdChanged {
location: Location,
expected: u64,
found: u64,
},
#[snafu(display("Failed to receive status, source: {}", source,))]
TryReceiveStatus {
location: Location,
source: TryRecvError,
},
#[snafu(display(
"Failed to request Datanode, expected: {}, but only {} available",
expected,
available
))]
NoEnoughAvailableDatanode {
location: Location,
expected: usize,
available: usize,
},
#[snafu(display("Failed to request Datanode {}, source: {}", peer, source))]
RequestDatanode {
location: Location,
peer: Peer,
source: client::Error,
},
#[snafu(display("Failed to send shutdown signal"))]
SendShutdownSignal { source: SendError<()> },
@@ -274,6 +314,18 @@ pub enum Error {
source: common_procedure::Error,
},
#[snafu(display("Failed to recover procedure, source: {source}"))]
WaitProcedure {
location: Location,
source: common_procedure::Error,
},
#[snafu(display("Failed to submit procedure, source: {source}"))]
SubmitProcedure {
location: Location,
source: common_procedure::Error,
},
#[snafu(display("Schema already exists, name: {schema_name}"))]
SchemaAlreadyExists {
schema_name: String,
@@ -413,7 +465,9 @@ impl ErrorExt for Error {
| Error::MailboxReceiver { .. }
| Error::RetryLater { .. }
| Error::StartGrpc { .. }
| Error::Combine { .. } => StatusCode::Internal,
| Error::Combine { .. }
| Error::NoEnoughAvailableDatanode { .. }
| Error::TryReceiveStatus { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
| Error::MissingRequestHeader { .. }
@@ -437,10 +491,15 @@ impl ErrorExt for Error {
| Error::InvalidUtf8Value { .. }
| Error::UnexpectedInstructionReply { .. }
| Error::EtcdTxnOpResponse { .. }
| Error::Unexpected { .. } => StatusCode::Unexpected,
| Error::Unexpected { .. }
| Error::Txn { .. }
| Error::TableIdChanged { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::RequestDatanode { source, .. } => source.status_code(),
Error::InvalidCatalogValue { source, .. } => source.status_code(),
Error::RecoverProcedure { source, .. } => source.status_code(),
Error::RecoverProcedure { source, .. }
| Error::SubmitProcedure { source, .. }
| Error::WaitProcedure { source, .. } => source.status_code(),
Error::ShutdownServer { source, .. } | Error::StartHttp { source, .. } => {
source.status_code()
}

View File

@@ -17,6 +17,7 @@
pub mod bootstrap;
pub mod cluster;
pub mod ddl;
pub mod election;
pub mod error;
mod failure_detector;

View File

@@ -28,6 +28,7 @@ use snafu::ResultExt;
use tokio::sync::broadcast::error::RecvError;
use crate::cluster::MetaPeerClientRef;
use crate::ddl::DdlManagerRef;
use crate::election::{Election, LeaderChangeMessage};
use crate::error::{RecoverProcedureSnafu, Result};
use crate::handler::HeartbeatHandlerGroup;
@@ -135,6 +136,7 @@ pub struct MetaSrv {
procedure_manager: ProcedureManagerRef,
metadata_service: MetadataServiceRef,
mailbox: MailboxRef,
ddl_manager: DdlManagerRef,
}
impl MetaSrv {
@@ -276,6 +278,10 @@ impl MetaSrv {
}
#[inline]
pub fn ddl_manager(&self) -> &DdlManagerRef {
&self.ddl_manager
}
pub fn procedure_manager(&self) -> &ProcedureManagerRef {
&self.procedure_manager
}

View File

@@ -15,9 +15,11 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use client::client_manager::DatanodeClients;
use common_procedure::local::{LocalManager, ManagerConfig};
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::ddl::DdlManager;
use crate::error::Result;
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
@@ -210,6 +212,15 @@ impl MetaSrvBuilder {
}
};
// TODO(weny): considers to modify the default config of procedure manager
let ddl_manager = Arc::new(DdlManager::new(
procedure_manager.clone(),
kv_store.clone(),
Arc::new(DatanodeClients::default()),
));
let _ = ddl_manager.try_start();
Ok(MetaSrv {
started,
options,
@@ -225,6 +236,7 @@ impl MetaSrvBuilder {
procedure_manager,
metadata_service,
mailbox,
ddl_manager,
})
}
}

View File

@@ -12,5 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod create_table;
pub mod region_failover;
pub(crate) mod state_store;

View File

@@ -0,0 +1,330 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::TableRouteValue;
use async_trait::async_trait;
use catalog::helper::TableGlobalKey;
use client::Database;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_meta::key::TableRouteKey;
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::TableRoute;
use common_meta::table_name::TableName;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status,
};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use table::engine::TableReference;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::service::router::create_table_global_value;
use crate::service::store::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::table_routes::get_table_global_value;
// TODO(weny): removes in following PRs.
#[allow(unused)]
pub struct CreateTableProcedure {
context: DdlContext,
creator: TableCreator,
}
// TODO(weny): removes in following PRs.
#[allow(dead_code)]
impl CreateTableProcedure {
pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
pub(crate) fn new(
cluster_id: u64,
task: CreateTableTask,
table_route: TableRoute,
context: DdlContext,
) -> Self {
Self {
context,
creator: TableCreator::new(cluster_id, task, table_route),
}
}
pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(CreateTableProcedure {
context,
creator: TableCreator { data },
})
}
fn global_table_key(&self) -> TableGlobalKey {
let table_ref = self.creator.data.table_ref();
TableGlobalKey {
catalog_name: table_ref.catalog.to_string(),
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
}
}
fn table_name(&self) -> TableName {
self.creator.data.task.table_name()
}
/// Checks whether the table exists.
async fn on_prepare(&mut self) -> Result<Status> {
if (get_table_global_value(&self.context.kv_store, &self.global_table_key()).await?)
.is_some()
{
ensure!(
self.creator.data.task.create_table.create_if_not_exists,
error::TableAlreadyExistsSnafu {
table_name: self.creator.data.table_ref().to_string(),
}
);
return Ok(Status::Done);
}
self.creator.data.state = CreateTableState::DatanodeCreateTable;
Ok(Status::executing(true))
}
/// registers the `TableRouteValue`,`TableGlobalValue`
async fn register_metadata(&self) -> Result<()> {
let table_name = self.table_name();
let table_id = self.creator.data.table_route.table.id;
let table_route_key = TableRouteKey::with_table_name(table_id, &table_name.clone().into())
.key()
.into_bytes();
let table_global_key = TableGlobalKey {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
}
.to_string()
.into_bytes();
let (peers, table_route) = self
.creator
.data
.table_route
.clone()
.try_into_raw()
.context(error::ConvertProtoDataSnafu)?;
let table_route_value = TableRouteValue {
peers,
table_route: Some(table_route),
};
let table_global_value = create_table_global_value(
&table_route_value,
self.creator.data.task.table_info.clone(),
)?
.as_bytes()
.context(error::InvalidCatalogValueSnafu)?;
let txn = Txn::new()
.when(vec![
Compare::with_not_exist_value(table_route_key.clone(), CompareOp::Equal),
Compare::with_not_exist_value(table_global_key.clone(), CompareOp::Equal),
])
.and_then(vec![
TxnOp::Put(table_route_key, table_route_value.into()),
TxnOp::Put(table_global_key, table_global_value),
]);
let resp = self.context.kv_store.txn(txn).await?;
ensure!(
resp.succeeded,
error::TxnSnafu {
msg: "table_route_key or table_global_key exists"
}
);
Ok(())
}
async fn on_create_metadata(&mut self) -> Result<Status> {
let kv_store = &self.context.kv_store;
let key = &self.global_table_key();
match get_table_global_value(kv_store, key).await? {
Some(table_global_value) => {
// The metasrv crashed after metadata was created immediately.
// Recovers table_route from kv.
let table_id = table_global_value.table_id() as u64;
let expected = self.creator.data.table_route.table.id;
// If there is something like:
// Create table A, Create table A(from another Fe, Somehow, Failed), Renames table A to B, Create table A(Recovered).
// We must ensure the table_id isn't changed.
ensure!(
table_id == expected,
error::TableIdChangedSnafu {
expected,
found: table_id
}
);
}
None => {
// registers metadata
self.register_metadata().await?;
}
}
Ok(Status::Done)
}
async fn on_datanode_create_table(&mut self) -> Result<Status> {
let table_route = &self.creator.data.table_route;
let table_name = self.table_name();
let clients = self.context.datanode_clients.clone();
let leaders = table_route.find_leaders();
let mut joins = Vec::with_capacity(leaders.len());
for datanode in leaders {
let client = clients.get_client(&datanode).await;
let client = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
let regions = table_route.find_leader_regions(&datanode);
let mut create_expr_for_region = self.creator.data.task.create_table.clone();
create_expr_for_region.region_numbers = regions;
joins.push(common_runtime::spawn_bg(async move {
if let Err(err) = client
.create(create_expr_for_region)
.await
.context(error::RequestDatanodeSnafu { peer: datanode })
{
// TODO(weny): add tests for `TableAlreadyExists`
if err.status_code() != StatusCode::TableAlreadyExists {
return Err(err);
}
}
Ok(())
}));
}
let _ = join_all(joins)
.await
.into_iter()
.map(|result| {
result.map_err(|err| {
error::RetryLaterSnafu {
reason: format!(
"Failed to execute create table on datanode, source: {}",
err
),
}
.build()
})
})
.collect::<Result<Vec<_>>>()?;
self.creator.data.state = CreateTableState::CreateMetadata;
Ok(Status::executing(true))
}
}
#[async_trait]
impl Procedure for CreateTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let error_handler = |e| {
if matches!(e, error::Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
};
match self.creator.data.state {
CreateTableState::Prepare => self.on_prepare().await.map_err(error_handler),
CreateTableState::DatanodeCreateTable => {
self.on_datanode_create_table().await.map_err(error_handler)
}
CreateTableState::CreateMetadata => {
self.on_create_metadata().await.map_err(error_handler)
}
}
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let table_ref = &self.creator.data.table_ref();
let key = common_catalog::format_full_table_name(
table_ref.catalog,
table_ref.schema,
table_ref.table,
);
LockKey::single(key)
}
}
pub struct TableCreator {
data: CreateTableData,
}
impl TableCreator {
pub fn new(cluster_id: u64, task: CreateTableTask, table_route: TableRoute) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,
cluster_id,
task,
table_route,
},
}
}
}
#[derive(Debug, Serialize, Deserialize)]
enum CreateTableState {
/// Prepares to create the table
Prepare,
/// Datanode creates the table
DatanodeCreateTable,
/// Creates metadata
CreateMetadata,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateTableData {
state: CreateTableState,
task: CreateTableTask,
table_route: TableRoute,
cluster_id: u64,
}
impl CreateTableData {
fn table_ref(&self) -> TableReference<'_> {
self.task.table_ref()
}
}

View File

@@ -19,6 +19,7 @@ use tonic::{Response, Status};
pub mod admin;
pub mod cluster;
pub mod ddl;
mod heartbeat;
pub mod lock;
pub mod mailbox;

View File

@@ -0,0 +1,187 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{
ddl_task_server, Partition, Region, RegionRoute, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
Table, TableRoute,
};
use api::v1::TableId;
use common_meta::rpc::ddl::{CreateTableTask, DdlTask};
use common_meta::rpc::router;
use common_meta::table_name::TableName;
use common_telemetry::{info, warn};
use snafu::{OptionExt, ResultExt};
use table::metadata::RawTableInfo;
use tonic::{Request, Response};
use super::GrpcResult;
use crate::ddl::DdlManagerRef;
use crate::error::{self, Result};
use crate::metasrv::{MetaSrv, SelectorContext, SelectorRef};
use crate::sequence::SequenceRef;
#[async_trait::async_trait]
impl ddl_task_server::DdlTask for MetaSrv {
async fn submit_ddl_task(
&self,
request: Request<SubmitDdlTaskRequest>,
) -> GrpcResult<SubmitDdlTaskResponse> {
let SubmitDdlTaskRequest { header, task, .. } = request.into_inner();
let header = header.context(error::MissingRequestHeaderSnafu)?;
let task: DdlTask = task
.context(error::MissingRequiredParameterSnafu { param: "task" })?
.try_into()
.context(error::ConvertProtoDataSnafu)?;
let ctx = SelectorContext {
datanode_lease_secs: self.options().datanode_lease_secs,
server_addr: self.options().server_addr.clone(),
kv_store: self.kv_store(),
meta_peer_client: self.meta_peer_client(),
catalog: None,
schema: None,
table: None,
};
let resp = match task {
DdlTask::CreateTable(create_table_task) => {
handle_create_table_task(
header.cluster_id,
create_table_task,
ctx,
self.selector().clone(),
self.table_id_sequence().clone(),
self.ddl_manager().clone(),
)
.await?
}
};
Ok(Response::new(resp))
}
}
async fn handle_create_table_task(
cluster_id: u64,
mut create_table_task: CreateTableTask,
ctx: SelectorContext,
selector: SelectorRef,
table_id_sequence: SequenceRef,
ddl_manager: DdlManagerRef,
) -> Result<SubmitDdlTaskResponse> {
let table_name = create_table_task.table_name();
let ctx = SelectorContext {
datanode_lease_secs: ctx.datanode_lease_secs,
server_addr: ctx.server_addr,
kv_store: ctx.kv_store,
meta_peer_client: ctx.meta_peer_client,
catalog: Some(table_name.catalog_name.clone()),
schema: Some(table_name.schema_name.clone()),
table: Some(table_name.table_name.clone()),
};
let partitions = create_table_task
.partitions
.clone()
.into_iter()
.map(Into::into)
.collect();
let table_route = handle_create_table_route(
cluster_id,
table_name,
partitions,
&mut create_table_task.table_info,
ctx,
selector,
table_id_sequence,
)
.await?;
let table_id = table_route.table.id;
let id = ddl_manager
.submit_create_table_task(cluster_id, create_table_task, table_route)
.await?;
info!("Table: {table_id} created via procedure_id {id:?}");
Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
table_id: Some(TableId {
id: table_id as u32,
}),
..Default::default()
})
}
/// pre-calculates create table task's metadata.
async fn handle_create_table_route(
cluster_id: u64,
table_name: TableName,
partitions: Vec<Partition>,
table_info: &mut RawTableInfo,
ctx: SelectorContext,
selector: SelectorRef,
table_id_sequence: SequenceRef,
) -> Result<router::TableRoute> {
let mut peers = selector.select(cluster_id, &ctx).await?;
if peers.len() < partitions.len() {
warn!("Create table failed due to no enough available datanodes, table: {table_name:?}, partition number: {}, datanode number: {}", partitions.len(), peers.len());
return error::NoEnoughAvailableDatanodeSnafu {
expected: partitions.len(),
available: peers.len(),
}
.fail();
}
// We don't need to keep all peers, just truncate it to the number of partitions.
// If the peers are not enough, some peers will be used for multiple partitions.
peers.truncate(partitions.len());
let id = table_id_sequence.next().await?;
table_info.ident.table_id = id as u32;
let table = Table {
id,
table_name: Some(table_name.into()),
..Default::default()
};
let region_routes = partitions
.into_iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: i as u64,
partition: Some(partition),
..Default::default()
};
RegionRoute {
region: Some(region),
leader_peer_index: (i % peers.len()) as u64,
follower_peer_indexes: vec![], // follower_peers is not supported at the moment
}
})
.collect::<Vec<_>>();
let table_route = TableRoute {
table: Some(table),
region_routes,
};
router::TableRoute::try_from_raw(&peers, table_route).context(error::TableRouteConversionSnafu)
}

View File

@@ -258,7 +258,7 @@ async fn handle_create(
})
}
fn create_table_global_value(
pub(crate) fn create_table_global_value(
table_route_value: &TableRouteValue,
table_info: RawTableInfo,
) -> Result<TableGlobalValue> {
@@ -349,7 +349,7 @@ async fn handle_delete(req: DeleteRequest, ctx: Context) -> Result<RouteResponse
})
}
fn fill_table_routes(
pub(crate) fn fill_table_routes(
tables: Vec<(TableGlobalValue, TableRouteValue)>,
) -> Result<(Vec<Peer>, Vec<TableRoute>)> {
let mut peer_dict = PeerDict::default();
@@ -407,7 +407,7 @@ async fn fetch_tables(
Ok(tables)
}
fn table_route_key(table_id: u64, t: &TableGlobalKey) -> TableRouteKey<'_> {
pub(crate) fn table_route_key(table_id: u64, t: &TableGlobalKey) -> TableRouteKey<'_> {
TableRouteKey {
table_id,
catalog_name: &t.catalog_name,

View File

@@ -183,7 +183,6 @@ impl ErrorExt for Error {
| BuildTableMeta { .. }
| BuildTableInfo { .. }
| BuildRegionDescriptor { .. }
| TableExists { .. }
| ProjectedColumnNotFound { .. }
| InvalidPrimaryKey { .. }
| MissingTimestampIndex { .. }
@@ -191,6 +190,8 @@ impl ErrorExt for Error {
| InvalidRawSchema { .. }
| VersionChanged { .. } => StatusCode::InvalidArguments,
TableExists { .. } => StatusCode::TableAlreadyExists,
ConvertRaw { .. } => StatusCode::Unexpected,
ScanTableManifest { .. } | UpdateTableManifest { .. } => StatusCode::StorageUnavailable,

View File

@@ -15,6 +15,7 @@
mod empty_table;
mod memtable;
mod mock_engine;
pub mod table_info;
pub use empty_table::EmptyTable;
pub use memtable::MemTable;

View File

@@ -0,0 +1,50 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use datatypes::schema::SchemaRef;
use crate::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion};
pub fn test_table_info(
table_id: u32,
table_name: &str,
schema_name: &str,
catalog_name: &str,
schema: SchemaRef,
) -> TableInfo {
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.value_indices(vec![])
.engine("mito".to_string())
.next_column_id(0)
.engine_options(Default::default())
.options(Default::default())
.created_on(Default::default())
.region_numbers(vec![1])
.build()
.unwrap();
TableInfoBuilder::default()
.table_id(table_id)
.table_version(0 as TableVersion)
.name(table_name.to_string())
.schema_name(schema_name.to_string())
.catalog_name(catalog_name.to_string())
.desc(None)
.table_type(TableType::Base)
.meta(meta)
.build()
.unwrap()
}