diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 3c1ae0a514..0c1c2cd600 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -15,7 +15,7 @@ mod client; pub mod client_manager; mod database; -mod error; +pub mod error; pub mod load_balance; mod metrics; mod stream_insert; diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 9d4287d31a..96913cfe15 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -16,11 +16,11 @@ use std::result; use api::v1::meta::submit_ddl_task_request::Task; use api::v1::meta::{ - CreateTableTask as PbCreateTableTask, Partition, + CreateTableTask as PbCreateTableTask, DropTableTask as PbDropTableTask, Partition, SubmitDdlTaskRequest as PbSubmitDdlTaskRequest, SubmitDdlTaskResponse as PbSubmitDdlTaskResponse, }; -use api::v1::CreateTableExpr; +use api::v1::{CreateTableExpr, DropTableExpr}; use prost::Message; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -33,6 +33,7 @@ use crate::table_name::TableName; #[derive(Debug)] pub enum DdlTask { CreateTable(CreateTableTask), + DropTable(DropTableTask), } impl DdlTask { @@ -52,6 +53,7 @@ impl TryFrom for DdlTask { Task::CreateTableTask(create_table) => { Ok(DdlTask::CreateTable(create_table.try_into()?)) } + Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)), _ => todo!(), } } @@ -71,6 +73,15 @@ impl TryFrom for PbSubmitDdlTaskRequest { create_table: Some(task.create_table), partitions: task.partitions, }), + + DdlTask::DropTable(task) => Task::DropTableTask(PbDropTableTask { + drop_table: Some(DropTableExpr { + catalog_name: task.catalog, + schema_name: task.schema, + table_name: task.table, + table_id: Some(api::v1::TableId { id: task.table_id }), + }), + }), }; Ok(Self { header: None, @@ -98,6 +109,54 @@ impl TryFrom for SubmitDdlTaskResponse { } } +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct DropTableTask { + pub catalog: String, + pub schema: String, + pub table: String, + pub table_id: TableId, +} + +impl DropTableTask { + pub fn table_ref(&self) -> TableReference { + TableReference { + catalog: &self.catalog, + schema: &self.schema, + table: &self.table, + } + } + + pub fn table_name(&self) -> TableName { + TableName { + catalog_name: self.catalog.to_string(), + schema_name: self.schema.to_string(), + table_name: self.table.to_string(), + } + } +} + +impl TryFrom for DropTableTask { + type Error = error::Error; + + fn try_from(pb: PbDropTableTask) -> Result { + let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu { + err_msg: "expected drop table", + })?; + + Ok(Self { + catalog: drop_table.catalog_name, + schema: drop_table.schema_name, + table: drop_table.table_name, + table_id: drop_table + .table_id + .context(error::InvalidProtoMsgSnafu { + err_msg: "expected table_id", + })? + .id, + }) + } +} + #[derive(Debug, PartialEq)] pub struct CreateTableTask { pub create_table: CreateTableExpr, diff --git a/src/meta-srv/src/ddl.rs b/src/meta-srv/src/ddl.rs index a465328b06..ba84055feb 100644 --- a/src/meta-srv/src/ddl.rs +++ b/src/meta-srv/src/ddl.rs @@ -15,13 +15,15 @@ use std::sync::Arc; use client::client_manager::DatanodeClients; -use common_meta::rpc::ddl::CreateTableTask; +use common_meta::rpc::ddl::{CreateTableTask, DropTableTask}; use common_meta::rpc::router::TableRoute; use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; use snafu::ResultExt; use crate::error::{self, Result}; use crate::procedure::create_table::CreateTableProcedure; +use crate::procedure::drop_table::DropTableProcedure; +use crate::service::mailbox::MailboxRef; use crate::service::store::kv::KvStoreRef; pub type DdlManagerRef = Arc; @@ -30,6 +32,8 @@ pub struct DdlManager { procedure_manager: ProcedureManagerRef, kv_store: KvStoreRef, datanode_clients: Arc, + mailbox: MailboxRef, + server_addr: String, } // TODO(weny): removes in following PRs. @@ -38,6 +42,8 @@ pub struct DdlManager { pub(crate) struct DdlContext { pub(crate) kv_store: KvStoreRef, pub(crate) datanode_clients: Arc, + pub(crate) mailbox: MailboxRef, + pub(crate) server_addr: String, } impl DdlManager { @@ -45,11 +51,15 @@ impl DdlManager { procedure_manager: ProcedureManagerRef, kv_store: KvStoreRef, datanode_clients: Arc, + mailbox: MailboxRef, + server_addr: String, ) -> Self { Self { procedure_manager, kv_store, datanode_clients, + mailbox, + server_addr, } } @@ -57,6 +67,8 @@ impl DdlManager { DdlContext { kv_store: self.kv_store.clone(), datanode_clients: self.datanode_clients.clone(), + mailbox: self.mailbox.clone(), + server_addr: self.server_addr.clone(), } } @@ -92,6 +104,21 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + pub async fn submit_drop_table_task( + &self, + cluster_id: u64, + drop_table_task: DropTableTask, + table_route: TableRoute, + ) -> Result { + let context = self.create_context(); + + let procedure = DropTableProcedure::new(cluster_id, drop_table_task, table_route, context); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result { let procedure_id = procedure_with_id.id; diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 4f94d921f7..8bfb8b071e 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -14,15 +14,21 @@ use common_error::prelude::*; use common_meta::peer::Peer; +use common_runtime::JoinError; use snafu::Location; use tokio::sync::mpsc::error::SendError; -use tokio::sync::oneshot::error::TryRecvError; use tonic::codegen::http; use tonic::Code; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to join a future: {}", source))] + Join { + location: Location, + source: JoinError, + }, + #[snafu(display("Failed to execute transaction: {}", msg))] Txn { location: Location, msg: String }, @@ -37,12 +43,6 @@ pub enum Error { found: u64, }, - #[snafu(display("Failed to receive status, source: {}", source,))] - TryReceiveStatus { - location: Location, - source: TryRecvError, - }, - #[snafu(display( "Failed to request Datanode, expected: {}, but only {} available", expected, @@ -467,7 +467,7 @@ impl ErrorExt for Error { | Error::StartGrpc { .. } | Error::Combine { .. } | Error::NoEnoughAvailableDatanode { .. } - | Error::TryReceiveStatus { .. } => StatusCode::Internal, + | Error::Join { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } | Error::MissingRequestHeader { .. } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 94dcd78218..af1314197e 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -14,6 +14,7 @@ #![feature(async_closure)] #![feature(btree_drain_filter)] +#![feature(result_flattening)] pub mod bootstrap; pub mod cluster; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 38720bf7b5..86ab963c20 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -158,6 +158,17 @@ impl MetaSrvBuilder { .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone()))); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); + // TODO(weny): considers to modify the default config of procedure manager + let ddl_manager = Arc::new(DdlManager::new( + procedure_manager.clone(), + kv_store.clone(), + Arc::new(DatanodeClients::default()), + mailbox.clone(), + options.server_addr.clone(), + )); + + let _ = ddl_manager.try_start(); + let handler_group = match handler_group { Some(handler_group) => handler_group, None => { @@ -212,15 +223,6 @@ impl MetaSrvBuilder { } }; - // TODO(weny): considers to modify the default config of procedure manager - let ddl_manager = Arc::new(DdlManager::new( - procedure_manager.clone(), - kv_store.clone(), - Arc::new(DatanodeClients::default()), - )); - - let _ = ddl_manager.try_start(); - Ok(MetaSrv { started, options, diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index f819ccc11c..f2872cd3bb 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -13,5 +13,7 @@ // limitations under the License. pub mod create_table; +pub mod drop_table; pub mod region_failover; pub(crate) mod state_store; +mod utils; diff --git a/src/meta-srv/src/procedure/create_table.rs b/src/meta-srv/src/procedure/create_table.rs index aa7ba12c0e..52e062dcb2 100644 --- a/src/meta-srv/src/procedure/create_table.rs +++ b/src/meta-srv/src/procedure/create_table.rs @@ -23,15 +23,14 @@ use common_meta::rpc::ddl::CreateTableTask; use common_meta::rpc::router::TableRoute; use common_meta::table_name::TableName; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; -use common_procedure::{ - Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, -}; +use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use table::engine::TableReference; use table::metadata::TableId; +use super::utils::{handle_request_datanode_error, handle_retry_error}; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::service::router::create_table_global_value; @@ -212,14 +211,9 @@ impl CreateTableProcedure { create_expr_for_region.region_numbers = regions; joins.push(common_runtime::spawn_bg(async move { - if let Err(err) = client - .create(create_expr_for_region) - .await - .context(error::RequestDatanodeSnafu { peer: datanode }) - { - // TODO(weny): add tests for `TableAlreadyExists` + if let Err(err) = client.create(create_expr_for_region).await { if err.status_code() != StatusCode::TableAlreadyExists { - return Err(err); + return Err(handle_request_datanode_error(datanode)(err)); } } Ok(()) @@ -229,17 +223,7 @@ impl CreateTableProcedure { let _ = join_all(joins) .await .into_iter() - .map(|result| { - result.map_err(|err| { - error::RetryLaterSnafu { - reason: format!( - "Failed to execute create table on datanode, source: {}", - err - ), - } - .build() - }) - }) + .map(|e| e.context(error::JoinSnafu)) .collect::>>()?; self.creator.data.state = CreateTableState::CreateMetadata; @@ -255,22 +239,12 @@ impl Procedure for CreateTableProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let error_handler = |e| { - if matches!(e, error::Error::RetryLater { .. }) { - ProcedureError::retry_later(e) - } else { - ProcedureError::external(e) - } - }; match self.creator.data.state { - CreateTableState::Prepare => self.on_prepare().await.map_err(error_handler), - CreateTableState::DatanodeCreateTable => { - self.on_datanode_create_table().await.map_err(error_handler) - } - CreateTableState::CreateMetadata => { - self.on_create_metadata().await.map_err(error_handler) - } + CreateTableState::Prepare => self.on_prepare().await, + CreateTableState::DatanodeCreateTable => self.on_datanode_create_table().await, + CreateTableState::CreateMetadata => self.on_create_metadata().await, } + .map_err(handle_retry_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/meta-srv/src/procedure/drop_table.rs b/src/meta-srv/src/procedure/drop_table.rs new file mode 100644 index 0000000000..f36a83183e --- /dev/null +++ b/src/meta-srv/src/procedure/drop_table.rs @@ -0,0 +1,264 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::MailboxMessage; +use api::v1::{DropTableExpr, TableId}; +use async_trait::async_trait; +use client::Database; +use common_catalog::consts::MITO_ENGINE; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_meta::ident::TableIdent; +use common_meta::instruction::Instruction; +use common_meta::rpc::ddl::DropTableTask; +use common_meta::rpc::router::TableRoute; +use common_meta::table_name::TableName; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, +}; +use common_telemetry::debug; +use futures::future::join_all; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use table::engine::TableReference; + +use super::utils::{build_table_metadata_key, handle_retry_error}; +use crate::ddl::DdlContext; +use crate::error; +use crate::error::Result; +use crate::procedure::utils::{build_table_route_value, handle_request_datanode_error}; +use crate::service::mailbox::BroadcastChannel; +use crate::service::store::txn::{Compare, CompareOp, Txn, TxnOp}; +use crate::table_routes::fetch_table; +pub struct DropTableProcedure { + context: DdlContext, + data: DropTableData, +} + +// TODO(weny): removes in following PRs. +#[allow(unused)] +impl DropTableProcedure { + pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::DropTable"; + + pub(crate) fn new( + cluster_id: u64, + task: DropTableTask, + table_route: TableRoute, + context: DdlContext, + ) -> Self { + Self { + context, + data: DropTableData::new(cluster_id, task, table_route), + } + } + + pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + Ok(Self { context, data }) + } + + /// Removes the table metadata. + async fn on_remove_metadata(&mut self) -> Result { + let table_ref = self.data.table_ref(); + + // If metadata not exists (might have already been removed). + if fetch_table(&self.context.kv_store, table_ref) + .await? + .is_none() + { + self.data.state = DropTableState::InvalidateTableCache; + + return Ok(Status::executing(true)); + } + + let table_ref = self.data.table_ref(); + let table_id = self.data.task.table_id; + + let (table_global_key, table_route_key) = build_table_metadata_key(table_ref, table_id); + let table_route_value = build_table_route_value(self.data.table_route.clone())?; + + // To protect the potential resource leak issues. + // We must compare the table route value, before deleting. + let txn = Txn::new() + .when(vec![Compare::with_value( + table_route_key.to_string().into_bytes(), + CompareOp::Equal, + table_route_value.into(), + )]) + .and_then(vec![ + TxnOp::Delete(table_route_key.to_string().into_bytes()), + TxnOp::Delete(table_global_key.to_string().into_bytes()), + ]); + let resp = self.context.kv_store.txn(txn).await?; + + ensure!( + resp.succeeded, + error::TxnSnafu { + msg: "table_route_value changed" + } + ); + + self.data.state = DropTableState::InvalidateTableCache; + + Ok(Status::executing(true)) + } + + /// Broadcasts invalidate table cache instruction. + async fn on_broadcast(&mut self) -> Result { + let table_name = self.data.table_name(); + + let table_ident = TableIdent { + catalog: table_name.catalog_name, + schema: table_name.schema_name, + table: table_name.table_name, + table_id: self.data.task.table_id, + // TODO(weny): retrieves the engine from the upper. + engine: MITO_ENGINE.to_string(), + }; + let instruction = Instruction::InvalidateTableCache(table_ident); + + let msg = &MailboxMessage::json_message( + "Invalidate Table Cache by dropping table procedure", + &format!("Metasrv@{}", self.context.server_addr), + "Frontend broadcast", + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + + self.context + .mailbox + .broadcast(&BroadcastChannel::Frontend, msg) + .await?; + + self.data.state = DropTableState::DatanodeDropTable; + + Ok(Status::executing(true)) + } + + /// Executes drop table instruction on datanode. + async fn on_datanode_drop_table(&mut self) -> Result { + let table_route = &self.data.table_route; + + let table_ref = self.data.table_ref(); + let table_id = self.data.task.table_id; + + let clients = self.context.datanode_clients.clone(); + let leaders = table_route.find_leaders(); + let mut joins = Vec::with_capacity(leaders.len()); + + let expr = DropTableExpr { + catalog_name: table_ref.catalog.to_string(), + schema_name: table_ref.schema.to_string(), + table_name: table_ref.table.to_string(), + table_id: Some(TableId { id: table_id }), + }; + + for datanode in leaders { + debug!("Dropping table {table_ref} on Datanode {datanode:?}"); + + let client = clients.get_client(&datanode).await; + let client = Database::new(table_ref.catalog, table_ref.schema, client); + let expr = expr.clone(); + joins.push(common_runtime::spawn_bg(async move { + if let Err(err) = client.drop_table(expr).await { + // TODO(weny): add tests for `TableNotFound` + if err.status_code() != StatusCode::TableNotFound { + return Err(handle_request_datanode_error(datanode)(err)); + } + } + Ok(()) + })); + } + + let _ = join_all(joins) + .await + .into_iter() + .map(|e| e.context(error::JoinSnafu).flatten()) + .collect::>>()?; + + Ok(Status::Done) + } +} + +#[async_trait] +impl Procedure for DropTableProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + match self.data.state { + DropTableState::RemoveMetadata => self.on_remove_metadata().await, + DropTableState::InvalidateTableCache => self.on_broadcast().await, + DropTableState::DatanodeDropTable => self.on_datanode_drop_table().await, + } + .map_err(handle_retry_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let table_ref = &self.data.table_ref(); + let key = common_catalog::format_full_table_name( + table_ref.catalog, + table_ref.schema, + table_ref.table, + ); + + LockKey::single(key) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DropTableData { + state: DropTableState, + cluster_id: u64, + task: DropTableTask, + table_route: TableRoute, +} + +impl DropTableData { + pub fn new(cluster_id: u64, task: DropTableTask, table_route: TableRoute) -> Self { + Self { + state: DropTableState::RemoveMetadata, + cluster_id, + task, + table_route, + } + } + + fn table_ref(&self) -> TableReference { + self.task.table_ref() + } + + fn table_name(&self) -> TableName { + self.task.table_name() + } +} + +#[derive(Debug, Serialize, Deserialize)] +enum DropTableState { + /// Removes metadata + RemoveMetadata, + /// Invalidates Table Cache + InvalidateTableCache, + /// Datanode drops the table + DatanodeDropTable, +} diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs new file mode 100644 index 0000000000..113c75e718 --- /dev/null +++ b/src/meta-srv/src/procedure/utils.rs @@ -0,0 +1,81 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::TableRouteValue; +use catalog::helper::TableGlobalKey; +use common_meta::key::TableRouteKey; +use common_meta::peer::Peer; +use common_meta::rpc::router::TableRoute; +use common_procedure::error::Error as ProcedureError; +use snafu::{location, Location, ResultExt}; +use table::engine::TableReference; +use table::metadata::TableId; + +use crate::error::{self, Error, Result}; + +pub fn build_table_route_value(table_route: TableRoute) -> Result { + let (peers, table_route) = table_route + .try_into_raw() + .context(error::ConvertProtoDataSnafu)?; + + Ok(TableRouteValue { + peers, + table_route: Some(table_route), + }) +} + +pub fn build_table_metadata_key( + table_ref: TableReference<'_>, + table_id: TableId, +) -> (TableGlobalKey, TableRouteKey) { + let table_route_key = TableRouteKey { + table_id, + catalog_name: table_ref.catalog, + schema_name: table_ref.schema, + table_name: table_ref.schema, + }; + + let table_global_key = TableGlobalKey { + catalog_name: table_ref.catalog.to_string(), + schema_name: table_ref.schema.to_string(), + table_name: table_ref.table.to_string(), + }; + + (table_global_key, table_route_key) +} + +pub fn handle_request_datanode_error(datanode: Peer) -> impl FnOnce(client::error::Error) -> Error { + move |err| { + if matches!(err, client::error::Error::FlightGet { .. }) { + error::RetryLaterSnafu { + reason: format!("Failed to execute operation on datanode, source: {}", err), + } + .build() + } else { + error::Error::RequestDatanode { + location: location!(), + peer: datanode, + source: err, + } + } + } +} + +pub fn handle_retry_error(e: Error) -> ProcedureError { + if matches!(e, error::Error::RetryLater { .. }) { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } +} diff --git a/src/meta-srv/src/service/ddl.rs b/src/meta-srv/src/service/ddl.rs index 878db48cf7..1d9cb57612 100644 --- a/src/meta-srv/src/service/ddl.rs +++ b/src/meta-srv/src/service/ddl.rs @@ -17,7 +17,8 @@ use api::v1::meta::{ Table, TableRoute, }; use api::v1::TableId; -use common_meta::rpc::ddl::{CreateTableTask, DdlTask}; +use common_meta::key::TableRouteKey; +use common_meta::rpc::ddl::{CreateTableTask, DdlTask, DropTableTask}; use common_meta::rpc::router; use common_meta::table_name::TableName; use common_telemetry::{info, warn}; @@ -25,11 +26,13 @@ use snafu::{OptionExt, ResultExt}; use table::metadata::RawTableInfo; use tonic::{Request, Response}; +use super::store::kv::KvStoreRef; use super::GrpcResult; use crate::ddl::DdlManagerRef; use crate::error::{self, Result}; use crate::metasrv::{MetaSrv, SelectorContext, SelectorRef}; use crate::sequence::SequenceRef; +use crate::table_routes::get_table_route_value; #[async_trait::async_trait] impl ddl_task_server::DdlTask for MetaSrv { @@ -67,6 +70,15 @@ impl ddl_task_server::DdlTask for MetaSrv { ) .await? } + DdlTask::DropTable(drop_table_task) => { + handle_drop_table_task( + header.cluster_id, + drop_table_task, + self.kv_store().clone(), + self.ddl_manager().clone(), + ) + .await? + } }; Ok(Response::new(resp)) @@ -116,7 +128,7 @@ async fn handle_create_table_task( .submit_create_table_task(cluster_id, create_table_task, table_route) .await?; - info!("Table: {table_id} created via procedure_id {id:?}"); + info!("Table: {table_id} is dropped via procedure_id {id:?}"); Ok(SubmitDdlTaskResponse { key: id.to_string().into(), @@ -185,3 +197,42 @@ async fn handle_create_table_route( router::TableRoute::try_from_raw(&peers, table_route).context(error::TableRouteConversionSnafu) } + +async fn handle_drop_table_task( + cluster_id: u64, + drop_table_task: DropTableTask, + kv_store: KvStoreRef, + ddl_manager: DdlManagerRef, +) -> Result { + let table_id = drop_table_task.table_id; + + let table_route_key = TableRouteKey { + table_id, + catalog_name: &drop_table_task.catalog, + schema_name: &drop_table_task.schema, + table_name: &drop_table_task.table, + }; + + let table_route_value = get_table_route_value(&kv_store, &table_route_key).await?; + + let table_route = router::TableRoute::try_from_raw( + &table_route_value.peers, + table_route_value + .table_route + .context(error::UnexpectedSnafu { + violated: "expected table_route", + })?, + ) + .context(error::TableRouteConversionSnafu)?; + + let id = ddl_manager + .submit_drop_table_task(cluster_id, drop_table_task, table_route) + .await?; + + info!("Table: {table_id} created via procedure_id {id:?}"); + + Ok(SubmitDdlTaskResponse { + key: id.to_string().into(), + ..Default::default() + }) +} diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 2fc4e12b67..7837fc8e3b 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -34,11 +34,10 @@ use crate::metasrv::{Context, MetaSrv, SelectorContext, SelectorRef}; use crate::metrics::METRIC_META_ROUTE_REQUEST; use crate::sequence::SequenceRef; use crate::service::store::ext::KvStoreExt; -use crate::service::store::kv::KvStoreRef; use crate::service::GrpcResult; use crate::table_routes::{ - get_table_global_value, get_table_route_value, remove_table_global_value, - remove_table_route_value, + fetch_tables, get_table_global_value, remove_table_global_value, remove_table_route_value, + table_route_key, }; #[async_trait::async_trait] @@ -337,7 +336,7 @@ async fn handle_delete(req: DeleteRequest, ctx: Context) -> Result, -) -> Result> { - let mut tables = vec![]; - // Maybe we can optimize the for loop in the future, but in general, - // there won't be many keys, in fact, there is usually just one. - for tgk in keys { - let tgv = get_table_global_value(kv_store, &tgk).await?; - if tgv.is_none() { - warn!("Table global value is absent: {}", tgk); - continue; - } - let tgv = tgv.unwrap(); - - let trk = table_route_key(tgv.table_id() as u64, &tgk); - let trv = get_table_route_value(kv_store, &trk).await?; - - tables.push((tgv, trv)); - } - - Ok(tables) -} - -pub(crate) fn table_route_key(table_id: u64, t: &TableGlobalKey) -> TableRouteKey<'_> { - TableRouteKey { - table_id: table_id as _, - catalog_name: &t.catalog_name, - schema_name: &t.schema_name, - table_name: &t.table_name, - } -} diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index 8c130c3bdb..111ad30014 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -18,7 +18,9 @@ use api::v1::meta::{MoveValueRequest, PutRequest, TableRouteValue}; use catalog::helper::{TableGlobalKey, TableGlobalValue}; use common_meta::key::TableRouteKey; use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse}; +use common_telemetry::warn; use snafu::{OptionExt, ResultExt}; +use table::engine::TableReference; use crate::error::{ ConvertProtoDataSnafu, DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result, @@ -158,6 +160,61 @@ async fn move_value( Ok(res.kv.map(|kv| (kv.key, kv.value))) } +pub(crate) fn table_route_key(table_id: u32, t: &TableGlobalKey) -> TableRouteKey<'_> { + TableRouteKey { + table_id, + catalog_name: &t.catalog_name, + schema_name: &t.schema_name, + table_name: &t.table_name, + } +} + +pub(crate) async fn fetch_table( + kv_store: &KvStoreRef, + table_ref: TableReference<'_>, +) -> Result> { + let tgk = TableGlobalKey { + catalog_name: table_ref.catalog.to_string(), + schema_name: table_ref.schema.to_string(), + table_name: table_ref.table.to_string(), + }; + + let tgv = get_table_global_value(kv_store, &tgk).await?; + + if let Some(tgv) = tgv { + let trk = table_route_key(tgv.table_id(), &tgk); + let trv = get_table_route_value(kv_store, &trk).await?; + + return Ok(Some((tgv, trv))); + } + + Ok(None) +} + +pub(crate) async fn fetch_tables( + kv_store: &KvStoreRef, + keys: impl Iterator, +) -> Result> { + let mut tables = vec![]; + // Maybe we can optimize the for loop in the future, but in general, + // there won't be many keys, in fact, there is usually just one. + for tgk in keys { + let tgv = get_table_global_value(kv_store, &tgk).await?; + if tgv.is_none() { + warn!("Table global value is absent: {}", tgk); + continue; + } + let tgv = tgv.unwrap(); + + let trk = table_route_key(tgv.table_id(), &tgk); + let trv = get_table_route_value(kv_store, &trk).await?; + + tables.push((tgv, trv)); + } + + Ok(tables) +} + #[cfg(test)] pub(crate) mod tests { use std::collections::HashMap;