feat: implement drop table procedure (#1872)

* feat: implement drop table procedure

* fix: fix uncaught error

* refacotr: refactor error handling

* chore: apply suggestions from CR

* refactor: move fetch_table/s to table_routes.rs

* chore: fix clippy

* chore: apply suggestions from CR

* chore: rebase onto develop

* feat: compare the table_route value before deleting

* feat: handle if table already exists on datanode

* Update src/meta-srv/src/procedure/drop_table.rs

Co-authored-by: JeremyHi <jiachun_feng@proton.me>

---------

Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
Weny Xu
2023-07-07 17:03:40 +09:00
committed by GitHub
parent ad165c1c64
commit 64acfd3802
13 changed files with 579 additions and 95 deletions

View File

@@ -15,7 +15,7 @@
mod client;
pub mod client_manager;
mod database;
mod error;
pub mod error;
pub mod load_balance;
mod metrics;
mod stream_insert;

View File

@@ -16,11 +16,11 @@ use std::result;
use api::v1::meta::submit_ddl_task_request::Task;
use api::v1::meta::{
CreateTableTask as PbCreateTableTask, Partition,
CreateTableTask as PbCreateTableTask, DropTableTask as PbDropTableTask, Partition,
SubmitDdlTaskRequest as PbSubmitDdlTaskRequest,
SubmitDdlTaskResponse as PbSubmitDdlTaskResponse,
};
use api::v1::CreateTableExpr;
use api::v1::{CreateTableExpr, DropTableExpr};
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -33,6 +33,7 @@ use crate::table_name::TableName;
#[derive(Debug)]
pub enum DdlTask {
CreateTable(CreateTableTask),
DropTable(DropTableTask),
}
impl DdlTask {
@@ -52,6 +53,7 @@ impl TryFrom<Task> for DdlTask {
Task::CreateTableTask(create_table) => {
Ok(DdlTask::CreateTable(create_table.try_into()?))
}
Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
_ => todo!(),
}
}
@@ -71,6 +73,15 @@ impl TryFrom<SubmitDdlTaskRequest> for PbSubmitDdlTaskRequest {
create_table: Some(task.create_table),
partitions: task.partitions,
}),
DdlTask::DropTable(task) => Task::DropTableTask(PbDropTableTask {
drop_table: Some(DropTableExpr {
catalog_name: task.catalog,
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
}),
}),
};
Ok(Self {
header: None,
@@ -98,6 +109,54 @@ impl TryFrom<PbSubmitDdlTaskResponse> for SubmitDdlTaskResponse {
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct DropTableTask {
pub catalog: String,
pub schema: String,
pub table: String,
pub table_id: TableId,
}
impl DropTableTask {
pub fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.catalog,
schema: &self.schema,
table: &self.table,
}
}
pub fn table_name(&self) -> TableName {
TableName {
catalog_name: self.catalog.to_string(),
schema_name: self.schema.to_string(),
table_name: self.table.to_string(),
}
}
}
impl TryFrom<PbDropTableTask> for DropTableTask {
type Error = error::Error;
fn try_from(pb: PbDropTableTask) -> Result<Self> {
let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
err_msg: "expected drop table",
})?;
Ok(Self {
catalog: drop_table.catalog_name,
schema: drop_table.schema_name,
table: drop_table.table_name,
table_id: drop_table
.table_id
.context(error::InvalidProtoMsgSnafu {
err_msg: "expected table_id",
})?
.id,
})
}
}
#[derive(Debug, PartialEq)]
pub struct CreateTableTask {
pub create_table: CreateTableExpr,

View File

@@ -15,13 +15,15 @@
use std::sync::Arc;
use client::client_manager::DatanodeClients;
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::ddl::{CreateTableTask, DropTableTask};
use common_meta::rpc::router::TableRoute;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::procedure::create_table::CreateTableProcedure;
use crate::procedure::drop_table::DropTableProcedure;
use crate::service::mailbox::MailboxRef;
use crate::service::store::kv::KvStoreRef;
pub type DdlManagerRef = Arc<DdlManager>;
@@ -30,6 +32,8 @@ pub struct DdlManager {
procedure_manager: ProcedureManagerRef,
kv_store: KvStoreRef,
datanode_clients: Arc<DatanodeClients>,
mailbox: MailboxRef,
server_addr: String,
}
// TODO(weny): removes in following PRs.
@@ -38,6 +42,8 @@ pub struct DdlManager {
pub(crate) struct DdlContext {
pub(crate) kv_store: KvStoreRef,
pub(crate) datanode_clients: Arc<DatanodeClients>,
pub(crate) mailbox: MailboxRef,
pub(crate) server_addr: String,
}
impl DdlManager {
@@ -45,11 +51,15 @@ impl DdlManager {
procedure_manager: ProcedureManagerRef,
kv_store: KvStoreRef,
datanode_clients: Arc<DatanodeClients>,
mailbox: MailboxRef,
server_addr: String,
) -> Self {
Self {
procedure_manager,
kv_store,
datanode_clients,
mailbox,
server_addr,
}
}
@@ -57,6 +67,8 @@ impl DdlManager {
DdlContext {
kv_store: self.kv_store.clone(),
datanode_clients: self.datanode_clients.clone(),
mailbox: self.mailbox.clone(),
server_addr: self.server_addr.clone(),
}
}
@@ -92,6 +104,21 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
pub async fn submit_drop_table_task(
&self,
cluster_id: u64,
drop_table_task: DropTableTask,
table_route: TableRoute,
) -> Result<ProcedureId> {
let context = self.create_context();
let procedure = DropTableProcedure::new(cluster_id, drop_table_task, table_route, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
let procedure_id = procedure_with_id.id;

View File

@@ -14,15 +14,21 @@
use common_error::prelude::*;
use common_meta::peer::Peer;
use common_runtime::JoinError;
use snafu::Location;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::oneshot::error::TryRecvError;
use tonic::codegen::http;
use tonic::Code;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to join a future: {}", source))]
Join {
location: Location,
source: JoinError,
},
#[snafu(display("Failed to execute transaction: {}", msg))]
Txn { location: Location, msg: String },
@@ -37,12 +43,6 @@ pub enum Error {
found: u64,
},
#[snafu(display("Failed to receive status, source: {}", source,))]
TryReceiveStatus {
location: Location,
source: TryRecvError,
},
#[snafu(display(
"Failed to request Datanode, expected: {}, but only {} available",
expected,
@@ -467,7 +467,7 @@ impl ErrorExt for Error {
| Error::StartGrpc { .. }
| Error::Combine { .. }
| Error::NoEnoughAvailableDatanode { .. }
| Error::TryReceiveStatus { .. } => StatusCode::Internal,
| Error::Join { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
| Error::MissingRequestHeader { .. }

View File

@@ -14,6 +14,7 @@
#![feature(async_closure)]
#![feature(btree_drain_filter)]
#![feature(result_flattening)]
pub mod bootstrap;
pub mod cluster;

View File

@@ -158,6 +158,17 @@ impl MetaSrvBuilder {
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone())));
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));
// TODO(weny): considers to modify the default config of procedure manager
let ddl_manager = Arc::new(DdlManager::new(
procedure_manager.clone(),
kv_store.clone(),
Arc::new(DatanodeClients::default()),
mailbox.clone(),
options.server_addr.clone(),
));
let _ = ddl_manager.try_start();
let handler_group = match handler_group {
Some(handler_group) => handler_group,
None => {
@@ -212,15 +223,6 @@ impl MetaSrvBuilder {
}
};
// TODO(weny): considers to modify the default config of procedure manager
let ddl_manager = Arc::new(DdlManager::new(
procedure_manager.clone(),
kv_store.clone(),
Arc::new(DatanodeClients::default()),
));
let _ = ddl_manager.try_start();
Ok(MetaSrv {
started,
options,

View File

@@ -13,5 +13,7 @@
// limitations under the License.
pub mod create_table;
pub mod drop_table;
pub mod region_failover;
pub(crate) mod state_store;
mod utils;

View File

@@ -23,15 +23,14 @@ use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::TableRoute;
use common_meta::table_name::TableName;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use table::engine::TableReference;
use table::metadata::TableId;
use super::utils::{handle_request_datanode_error, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::service::router::create_table_global_value;
@@ -212,14 +211,9 @@ impl CreateTableProcedure {
create_expr_for_region.region_numbers = regions;
joins.push(common_runtime::spawn_bg(async move {
if let Err(err) = client
.create(create_expr_for_region)
.await
.context(error::RequestDatanodeSnafu { peer: datanode })
{
// TODO(weny): add tests for `TableAlreadyExists`
if let Err(err) = client.create(create_expr_for_region).await {
if err.status_code() != StatusCode::TableAlreadyExists {
return Err(err);
return Err(handle_request_datanode_error(datanode)(err));
}
}
Ok(())
@@ -229,17 +223,7 @@ impl CreateTableProcedure {
let _ = join_all(joins)
.await
.into_iter()
.map(|result| {
result.map_err(|err| {
error::RetryLaterSnafu {
reason: format!(
"Failed to execute create table on datanode, source: {}",
err
),
}
.build()
})
})
.map(|e| e.context(error::JoinSnafu))
.collect::<Result<Vec<_>>>()?;
self.creator.data.state = CreateTableState::CreateMetadata;
@@ -255,22 +239,12 @@ impl Procedure for CreateTableProcedure {
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let error_handler = |e| {
if matches!(e, error::Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
};
match self.creator.data.state {
CreateTableState::Prepare => self.on_prepare().await.map_err(error_handler),
CreateTableState::DatanodeCreateTable => {
self.on_datanode_create_table().await.map_err(error_handler)
}
CreateTableState::CreateMetadata => {
self.on_create_metadata().await.map_err(error_handler)
}
CreateTableState::Prepare => self.on_prepare().await,
CreateTableState::DatanodeCreateTable => self.on_datanode_create_table().await,
CreateTableState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -0,0 +1,264 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::MailboxMessage;
use api::v1::{DropTableExpr, TableId};
use async_trait::async_trait;
use client::Database;
use common_catalog::consts::MITO_ENGINE;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_meta::ident::TableIdent;
use common_meta::instruction::Instruction;
use common_meta::rpc::ddl::DropTableTask;
use common_meta::rpc::router::TableRoute;
use common_meta::table_name::TableName;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
use common_telemetry::debug;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use table::engine::TableReference;
use super::utils::{build_table_metadata_key, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error;
use crate::error::Result;
use crate::procedure::utils::{build_table_route_value, handle_request_datanode_error};
use crate::service::mailbox::BroadcastChannel;
use crate::service::store::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::table_routes::fetch_table;
pub struct DropTableProcedure {
context: DdlContext,
data: DropTableData,
}
// TODO(weny): removes in following PRs.
#[allow(unused)]
impl DropTableProcedure {
pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
pub(crate) fn new(
cluster_id: u64,
task: DropTableTask,
table_route: TableRoute,
context: DdlContext,
) -> Self {
Self {
context,
data: DropTableData::new(cluster_id, task, table_route),
}
}
pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}
/// Removes the table metadata.
async fn on_remove_metadata(&mut self) -> Result<Status> {
let table_ref = self.data.table_ref();
// If metadata not exists (might have already been removed).
if fetch_table(&self.context.kv_store, table_ref)
.await?
.is_none()
{
self.data.state = DropTableState::InvalidateTableCache;
return Ok(Status::executing(true));
}
let table_ref = self.data.table_ref();
let table_id = self.data.task.table_id;
let (table_global_key, table_route_key) = build_table_metadata_key(table_ref, table_id);
let table_route_value = build_table_route_value(self.data.table_route.clone())?;
// To protect the potential resource leak issues.
// We must compare the table route value, before deleting.
let txn = Txn::new()
.when(vec![Compare::with_value(
table_route_key.to_string().into_bytes(),
CompareOp::Equal,
table_route_value.into(),
)])
.and_then(vec![
TxnOp::Delete(table_route_key.to_string().into_bytes()),
TxnOp::Delete(table_global_key.to_string().into_bytes()),
]);
let resp = self.context.kv_store.txn(txn).await?;
ensure!(
resp.succeeded,
error::TxnSnafu {
msg: "table_route_value changed"
}
);
self.data.state = DropTableState::InvalidateTableCache;
Ok(Status::executing(true))
}
/// Broadcasts invalidate table cache instruction.
async fn on_broadcast(&mut self) -> Result<Status> {
let table_name = self.data.table_name();
let table_ident = TableIdent {
catalog: table_name.catalog_name,
schema: table_name.schema_name,
table: table_name.table_name,
table_id: self.data.task.table_id,
// TODO(weny): retrieves the engine from the upper.
engine: MITO_ENGINE.to_string(),
};
let instruction = Instruction::InvalidateTableCache(table_ident);
let msg = &MailboxMessage::json_message(
"Invalidate Table Cache by dropping table procedure",
&format!("Metasrv@{}", self.context.server_addr),
"Frontend broadcast",
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
self.context
.mailbox
.broadcast(&BroadcastChannel::Frontend, msg)
.await?;
self.data.state = DropTableState::DatanodeDropTable;
Ok(Status::executing(true))
}
/// Executes drop table instruction on datanode.
async fn on_datanode_drop_table(&mut self) -> Result<Status> {
let table_route = &self.data.table_route;
let table_ref = self.data.table_ref();
let table_id = self.data.task.table_id;
let clients = self.context.datanode_clients.clone();
let leaders = table_route.find_leaders();
let mut joins = Vec::with_capacity(leaders.len());
let expr = DropTableExpr {
catalog_name: table_ref.catalog.to_string(),
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
table_id: Some(TableId { id: table_id }),
};
for datanode in leaders {
debug!("Dropping table {table_ref} on Datanode {datanode:?}");
let client = clients.get_client(&datanode).await;
let client = Database::new(table_ref.catalog, table_ref.schema, client);
let expr = expr.clone();
joins.push(common_runtime::spawn_bg(async move {
if let Err(err) = client.drop_table(expr).await {
// TODO(weny): add tests for `TableNotFound`
if err.status_code() != StatusCode::TableNotFound {
return Err(handle_request_datanode_error(datanode)(err));
}
}
Ok(())
}));
}
let _ = join_all(joins)
.await
.into_iter()
.map(|e| e.context(error::JoinSnafu).flatten())
.collect::<Result<Vec<_>>>()?;
Ok(Status::Done)
}
}
#[async_trait]
impl Procedure for DropTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
match self.data.state {
DropTableState::RemoveMetadata => self.on_remove_metadata().await,
DropTableState::InvalidateTableCache => self.on_broadcast().await,
DropTableState::DatanodeDropTable => self.on_datanode_drop_table().await,
}
.map_err(handle_retry_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let table_ref = &self.data.table_ref();
let key = common_catalog::format_full_table_name(
table_ref.catalog,
table_ref.schema,
table_ref.table,
);
LockKey::single(key)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DropTableData {
state: DropTableState,
cluster_id: u64,
task: DropTableTask,
table_route: TableRoute,
}
impl DropTableData {
pub fn new(cluster_id: u64, task: DropTableTask, table_route: TableRoute) -> Self {
Self {
state: DropTableState::RemoveMetadata,
cluster_id,
task,
table_route,
}
}
fn table_ref(&self) -> TableReference {
self.task.table_ref()
}
fn table_name(&self) -> TableName {
self.task.table_name()
}
}
#[derive(Debug, Serialize, Deserialize)]
enum DropTableState {
/// Removes metadata
RemoveMetadata,
/// Invalidates Table Cache
InvalidateTableCache,
/// Datanode drops the table
DatanodeDropTable,
}

View File

@@ -0,0 +1,81 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::TableRouteValue;
use catalog::helper::TableGlobalKey;
use common_meta::key::TableRouteKey;
use common_meta::peer::Peer;
use common_meta::rpc::router::TableRoute;
use common_procedure::error::Error as ProcedureError;
use snafu::{location, Location, ResultExt};
use table::engine::TableReference;
use table::metadata::TableId;
use crate::error::{self, Error, Result};
pub fn build_table_route_value(table_route: TableRoute) -> Result<TableRouteValue> {
let (peers, table_route) = table_route
.try_into_raw()
.context(error::ConvertProtoDataSnafu)?;
Ok(TableRouteValue {
peers,
table_route: Some(table_route),
})
}
pub fn build_table_metadata_key(
table_ref: TableReference<'_>,
table_id: TableId,
) -> (TableGlobalKey, TableRouteKey) {
let table_route_key = TableRouteKey {
table_id,
catalog_name: table_ref.catalog,
schema_name: table_ref.schema,
table_name: table_ref.schema,
};
let table_global_key = TableGlobalKey {
catalog_name: table_ref.catalog.to_string(),
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
};
(table_global_key, table_route_key)
}
pub fn handle_request_datanode_error(datanode: Peer) -> impl FnOnce(client::error::Error) -> Error {
move |err| {
if matches!(err, client::error::Error::FlightGet { .. }) {
error::RetryLaterSnafu {
reason: format!("Failed to execute operation on datanode, source: {}", err),
}
.build()
} else {
error::Error::RequestDatanode {
location: location!(),
peer: datanode,
source: err,
}
}
}
}
pub fn handle_retry_error(e: Error) -> ProcedureError {
if matches!(e, error::Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
}

View File

@@ -17,7 +17,8 @@ use api::v1::meta::{
Table, TableRoute,
};
use api::v1::TableId;
use common_meta::rpc::ddl::{CreateTableTask, DdlTask};
use common_meta::key::TableRouteKey;
use common_meta::rpc::ddl::{CreateTableTask, DdlTask, DropTableTask};
use common_meta::rpc::router;
use common_meta::table_name::TableName;
use common_telemetry::{info, warn};
@@ -25,11 +26,13 @@ use snafu::{OptionExt, ResultExt};
use table::metadata::RawTableInfo;
use tonic::{Request, Response};
use super::store::kv::KvStoreRef;
use super::GrpcResult;
use crate::ddl::DdlManagerRef;
use crate::error::{self, Result};
use crate::metasrv::{MetaSrv, SelectorContext, SelectorRef};
use crate::sequence::SequenceRef;
use crate::table_routes::get_table_route_value;
#[async_trait::async_trait]
impl ddl_task_server::DdlTask for MetaSrv {
@@ -67,6 +70,15 @@ impl ddl_task_server::DdlTask for MetaSrv {
)
.await?
}
DdlTask::DropTable(drop_table_task) => {
handle_drop_table_task(
header.cluster_id,
drop_table_task,
self.kv_store().clone(),
self.ddl_manager().clone(),
)
.await?
}
};
Ok(Response::new(resp))
@@ -116,7 +128,7 @@ async fn handle_create_table_task(
.submit_create_table_task(cluster_id, create_table_task, table_route)
.await?;
info!("Table: {table_id} created via procedure_id {id:?}");
info!("Table: {table_id} is dropped via procedure_id {id:?}");
Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
@@ -185,3 +197,42 @@ async fn handle_create_table_route(
router::TableRoute::try_from_raw(&peers, table_route).context(error::TableRouteConversionSnafu)
}
async fn handle_drop_table_task(
cluster_id: u64,
drop_table_task: DropTableTask,
kv_store: KvStoreRef,
ddl_manager: DdlManagerRef,
) -> Result<SubmitDdlTaskResponse> {
let table_id = drop_table_task.table_id;
let table_route_key = TableRouteKey {
table_id,
catalog_name: &drop_table_task.catalog,
schema_name: &drop_table_task.schema,
table_name: &drop_table_task.table,
};
let table_route_value = get_table_route_value(&kv_store, &table_route_key).await?;
let table_route = router::TableRoute::try_from_raw(
&table_route_value.peers,
table_route_value
.table_route
.context(error::UnexpectedSnafu {
violated: "expected table_route",
})?,
)
.context(error::TableRouteConversionSnafu)?;
let id = ddl_manager
.submit_drop_table_task(cluster_id, drop_table_task, table_route)
.await?;
info!("Table: {table_id} created via procedure_id {id:?}");
Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
..Default::default()
})
}

View File

@@ -34,11 +34,10 @@ use crate::metasrv::{Context, MetaSrv, SelectorContext, SelectorRef};
use crate::metrics::METRIC_META_ROUTE_REQUEST;
use crate::sequence::SequenceRef;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::KvStoreRef;
use crate::service::GrpcResult;
use crate::table_routes::{
get_table_global_value, get_table_route_value, remove_table_global_value,
remove_table_route_value,
fetch_tables, get_table_global_value, remove_table_global_value, remove_table_route_value,
table_route_key,
};
#[async_trait::async_trait]
@@ -337,7 +336,7 @@ async fn handle_delete(req: DeleteRequest, ctx: Context) -> Result<RouteResponse
let _ = remove_table_global_value(&ctx.kv_store, &tgk).await?;
let trk = table_route_key(tgv.table_id() as u64, &tgk);
let trk = table_route_key(tgv.table_id(), &tgk);
let (_, trv) = remove_table_route_value(&ctx.kv_store, &trk).await?;
let (peers, table_routes) = fill_table_routes(vec![(tgv, trv)])?;
@@ -382,36 +381,3 @@ pub(crate) fn fill_table_routes(
Ok((peer_dict.into_peers(), table_routes))
}
async fn fetch_tables(
kv_store: &KvStoreRef,
keys: impl Iterator<Item = TableGlobalKey>,
) -> Result<Vec<(TableGlobalValue, TableRouteValue)>> {
let mut tables = vec![];
// Maybe we can optimize the for loop in the future, but in general,
// there won't be many keys, in fact, there is usually just one.
for tgk in keys {
let tgv = get_table_global_value(kv_store, &tgk).await?;
if tgv.is_none() {
warn!("Table global value is absent: {}", tgk);
continue;
}
let tgv = tgv.unwrap();
let trk = table_route_key(tgv.table_id() as u64, &tgk);
let trv = get_table_route_value(kv_store, &trk).await?;
tables.push((tgv, trv));
}
Ok(tables)
}
pub(crate) fn table_route_key(table_id: u64, t: &TableGlobalKey) -> TableRouteKey<'_> {
TableRouteKey {
table_id: table_id as _,
catalog_name: &t.catalog_name,
schema_name: &t.schema_name,
table_name: &t.table_name,
}
}

View File

@@ -18,7 +18,9 @@ use api::v1::meta::{MoveValueRequest, PutRequest, TableRouteValue};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_meta::key::TableRouteKey;
use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use table::engine::TableReference;
use crate::error::{
ConvertProtoDataSnafu, DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result,
@@ -158,6 +160,61 @@ async fn move_value(
Ok(res.kv.map(|kv| (kv.key, kv.value)))
}
pub(crate) fn table_route_key(table_id: u32, t: &TableGlobalKey) -> TableRouteKey<'_> {
TableRouteKey {
table_id,
catalog_name: &t.catalog_name,
schema_name: &t.schema_name,
table_name: &t.table_name,
}
}
pub(crate) async fn fetch_table(
kv_store: &KvStoreRef,
table_ref: TableReference<'_>,
) -> Result<Option<(TableGlobalValue, TableRouteValue)>> {
let tgk = TableGlobalKey {
catalog_name: table_ref.catalog.to_string(),
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
};
let tgv = get_table_global_value(kv_store, &tgk).await?;
if let Some(tgv) = tgv {
let trk = table_route_key(tgv.table_id(), &tgk);
let trv = get_table_route_value(kv_store, &trk).await?;
return Ok(Some((tgv, trv)));
}
Ok(None)
}
pub(crate) async fn fetch_tables(
kv_store: &KvStoreRef,
keys: impl Iterator<Item = TableGlobalKey>,
) -> Result<Vec<(TableGlobalValue, TableRouteValue)>> {
let mut tables = vec![];
// Maybe we can optimize the for loop in the future, but in general,
// there won't be many keys, in fact, there is usually just one.
for tgk in keys {
let tgv = get_table_global_value(kv_store, &tgk).await?;
if tgv.is_none() {
warn!("Table global value is absent: {}", tgk);
continue;
}
let tgv = tgv.unwrap();
let trk = table_route_key(tgv.table_id(), &tgk);
let trv = get_table_route_value(kv_store, &trk).await?;
tables.push((tgv, trv));
}
Ok(tables)
}
#[cfg(test)]
pub(crate) mod tests {
use std::collections::HashMap;