From a751aa5ba0bdd68d9df3109edbb3f104473c5887 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 12 Jul 2023 11:35:23 +0900 Subject: [PATCH] feat: switch to using drop table procedure (#1901) * feat: switch to using drop table procedure * chore: remove unused attributes * feat: register the drop table procedure loader * fix: fix typo --- src/common/meta/src/rpc/ddl.rs | 22 +++++-- src/frontend/src/error.rs | 9 ++- src/frontend/src/instance/distributed.rs | 80 +++++++++++------------- src/meta-client/src/client/ddl.rs | 3 +- src/meta-srv/src/ddl.rs | 14 +++++ src/meta-srv/src/procedure/drop_table.rs | 2 - src/meta-srv/src/procedure/utils.rs | 2 +- 7 files changed, 77 insertions(+), 55 deletions(-) diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 96913cfe15..dce9c2f24d 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -44,6 +44,20 @@ impl DdlTask { ) -> Self { DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info)) } + + pub fn new_drop_table( + catalog: String, + schema: String, + table: String, + table_id: TableId, + ) -> Self { + DdlTask::DropTable(DropTableTask { + catalog, + schema, + table, + table_id, + }) + } } impl TryFrom for DdlTask { @@ -92,19 +106,17 @@ impl TryFrom for PbSubmitDdlTaskRequest { pub struct SubmitDdlTaskResponse { pub key: Vec, - pub table_id: TableId, + pub table_id: Option, } impl TryFrom for SubmitDdlTaskResponse { type Error = error::Error; fn try_from(resp: PbSubmitDdlTaskResponse) -> Result { - let table_id = resp.table_id.context(error::InvalidProtoMsgSnafu { - err_msg: "expected table_id", - })?; + let table_id = resp.table_id.map(|t| t.id); Ok(Self { key: resp.key, - table_id: table_id.id, + table_id, }) } } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index df460fde17..c942d6fcd4 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -25,6 +25,12 @@ use store_api::storage::RegionNumber; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Unexpected, violated: {}", violated))] + Unexpected { + violated: String, + location: Location, + }, + #[snafu(display("Execute the operation timeout, source: {}", source))] Timeout { location: Location, @@ -652,7 +658,8 @@ impl ErrorExt for Error { | Error::BuildParquetRecordBatchStream { .. } | Error::ReadRecordBatch { .. } | Error::BuildFileStream { .. } - | Error::WriteStreamToFile { .. } => StatusCode::Unexpected, + | Error::WriteStreamToFile { .. } + | Error::Unexpected { .. } => StatusCode::Unexpected, Error::Catalog { source, .. } => source.status_code(), Error::CatalogEntrySerde { source, .. } => source.status_code(), diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 0456254a94..d91776710f 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -23,11 +23,11 @@ use api::v1::ddl_request::{Expr as DdlExpr, Expr}; use api::v1::greptime_request::Request; use api::v1::{ column_def, AlterExpr, CompactTableExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, - DropTableExpr, FlushTableExpr, InsertRequests, TableId, + FlushTableExpr, InsertRequests, }; use async_trait::async_trait; use catalog::helper::{SchemaKey, SchemaValue}; -use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest}; +use catalog::{CatalogManager, RegisterTableRequest}; use chrono::DateTime; use client::client_manager::DatanodeClients; use client::Database; @@ -36,9 +36,7 @@ use common_catalog::format_full_table_name; use common_error::prelude::BoxedError; use common_meta::peer::Peer; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; -use common_meta::rpc::router::{ - DeleteRequest as MetaDeleteRequest, Partition as MetaPartition, RouteRequest, -}; +use common_meta::rpc::router::{Partition as MetaPartition, RouteRequest}; use common_meta::rpc::store::CompareAndPutRequest; use common_meta::table_name::TableName; use common_query::Output; @@ -61,7 +59,7 @@ use sql::statements::statement::Statement; use sql::statements::{self, sql_value_to_value}; use store_api::storage::RegionNumber; use table::engine::TableReference; -use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; +use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType}; use table::requests::TableOptions; use table::table::AlterContext; use table::TableRef; @@ -119,12 +117,15 @@ impl DistInstance { .create_table_procedure(create_table, partitions, table_info.clone()) .await?; - let table_id = resp.table_id; + let table_id = resp.table_id.context(error::UnexpectedSnafu { + violated: "expected table_id", + })?; info!("Successfully created distributed table '{table_name}' with table id {table_id}"); + table_info.ident.table_id = table_id; let table_info = Arc::new(table_info.try_into().context(error::CreateTableInfoSnafu)?); - create_table.table_id = Some(TableId { id: table_id }); + create_table.table_id = Some(api::v1::TableId { id: table_id }); let table = Arc::new(DistTable::new( table_name.clone(), @@ -165,7 +166,7 @@ impl DistInstance { } async fn drop_table(&self, table_name: TableName) -> Result { - let _ = self + let table = self .catalog_manager .table( &table_name.catalog_name, @@ -178,42 +179,9 @@ impl DistInstance { table_name: table_name.to_string(), })?; - let route_response = self - .meta_client - .delete_route(MetaDeleteRequest { - table_name: table_name.clone(), - }) - .await - .context(RequestMetaSnafu)?; + let table_id = table.table_info().ident.table_id; - let request = DeregisterTableRequest { - catalog: table_name.catalog_name.clone(), - schema: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - }; - self.catalog_manager - .deregister_table(request) - .await - .context(CatalogSnafu)?; - - let expr = DropTableExpr { - catalog_name: table_name.catalog_name.clone(), - schema_name: table_name.schema_name.clone(), - table_name: table_name.table_name.clone(), - ..Default::default() - }; - for table_route in route_response.table_routes.iter() { - for datanode in table_route.find_leaders() { - debug!("Dropping table {table_name} on Datanode {datanode:?}"); - - let client = self.datanode_clients.get_client(&datanode).await; - let client = Database::new(&expr.catalog_name, &expr.schema_name, client); - let _ = client - .drop_table(expr.clone()) - .await - .context(RequestDatanodeSnafu)?; - } - } + self.drop_table_procedure(&table_name, table_id).await?; // Since the table information dropped on meta does not go through KvBackend, so we // manually invalidate the cache here. @@ -547,6 +515,30 @@ impl DistInstance { .context(error::RequestMetaSnafu) } + async fn drop_table_procedure( + &self, + table_name: &TableName, + table_id: TableId, + ) -> Result { + let request = SubmitDdlTaskRequest { + task: DdlTask::new_drop_table( + table_name.catalog_name.to_string(), + table_name.schema_name.to_string(), + table_name.table_name.to_string(), + table_id, + ), + }; + + timeout( + // TODO(weny): makes timeout configurable. + Duration::from_secs(10), + self.meta_client.submit_ddl_task(request), + ) + .await + .context(error::TimeoutSnafu)? + .context(error::RequestMetaSnafu) + } + async fn handle_dist_insert( &self, requests: InsertRequests, diff --git a/src/meta-client/src/client/ddl.rs b/src/meta-client/src/client/ddl.rs index 90191dd519..8e5fda721d 100644 --- a/src/meta-client/src/client/ddl.rs +++ b/src/meta-client/src/client/ddl.rs @@ -67,8 +67,7 @@ impl Client { } #[derive(Debug)] -// TODO(weny): removes this in following PRs. -#[allow(unused)] + struct Inner { id: Id, role: Role, diff --git a/src/meta-srv/src/ddl.rs b/src/meta-srv/src/ddl.rs index e165617ae3..e7e07b66ba 100644 --- a/src/meta-srv/src/ddl.rs +++ b/src/meta-srv/src/ddl.rs @@ -83,6 +83,20 @@ impl DdlManager { ) .context(error::RegisterProcedureLoaderSnafu { type_name: CreateTableProcedure::TYPE_NAME, + })?; + + let context = self.create_context(); + + self.procedure_manager + .register_loader( + DropTableProcedure::TYPE_NAME, + Box::new(move |json| { + let context = context.clone(); + DropTableProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }), + ) + .context(error::RegisterProcedureLoaderSnafu { + type_name: DropTableProcedure::TYPE_NAME, }) } diff --git a/src/meta-srv/src/procedure/drop_table.rs b/src/meta-srv/src/procedure/drop_table.rs index b54f491293..31ea217fa7 100644 --- a/src/meta-srv/src/procedure/drop_table.rs +++ b/src/meta-srv/src/procedure/drop_table.rs @@ -47,8 +47,6 @@ pub struct DropTableProcedure { data: DropTableData, } -// TODO(weny): removes in following PRs. -#[allow(unused)] impl DropTableProcedure { pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::DropTable"; diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 113c75e718..3823135169 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -43,7 +43,7 @@ pub fn build_table_metadata_key( table_id, catalog_name: table_ref.catalog, schema_name: table_ref.schema, - table_name: table_ref.schema, + table_name: table_ref.table, }; let table_global_key = TableGlobalKey {