feat: switch to using drop table procedure (#1901)

* feat: switch to using drop table procedure

* chore: remove unused attributes

* feat: register the drop table procedure loader

* fix: fix typo
This commit is contained in:
Weny Xu
2023-07-12 11:35:23 +09:00
committed by GitHub
parent 264c5ea720
commit a751aa5ba0
7 changed files with 77 additions and 55 deletions

View File

@@ -44,6 +44,20 @@ impl DdlTask {
) -> Self {
DdlTask::CreateTable(CreateTableTask::new(expr, partitions, table_info))
}
pub fn new_drop_table(
catalog: String,
schema: String,
table: String,
table_id: TableId,
) -> Self {
DdlTask::DropTable(DropTableTask {
catalog,
schema,
table,
table_id,
})
}
}
impl TryFrom<Task> for DdlTask {
@@ -92,19 +106,17 @@ impl TryFrom<SubmitDdlTaskRequest> for PbSubmitDdlTaskRequest {
pub struct SubmitDdlTaskResponse {
pub key: Vec<u8>,
pub table_id: TableId,
pub table_id: Option<TableId>,
}
impl TryFrom<PbSubmitDdlTaskResponse> for SubmitDdlTaskResponse {
type Error = error::Error;
fn try_from(resp: PbSubmitDdlTaskResponse) -> Result<Self> {
let table_id = resp.table_id.context(error::InvalidProtoMsgSnafu {
err_msg: "expected table_id",
})?;
let table_id = resp.table_id.map(|t| t.id);
Ok(Self {
key: resp.key,
table_id: table_id.id,
table_id,
})
}
}

View File

@@ -25,6 +25,12 @@ use store_api::storage::RegionNumber;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Unexpected, violated: {}", violated))]
Unexpected {
violated: String,
location: Location,
},
#[snafu(display("Execute the operation timeout, source: {}", source))]
Timeout {
location: Location,
@@ -652,7 +658,8 @@ impl ErrorExt for Error {
| Error::BuildParquetRecordBatchStream { .. }
| Error::ReadRecordBatch { .. }
| Error::BuildFileStream { .. }
| Error::WriteStreamToFile { .. } => StatusCode::Unexpected,
| Error::WriteStreamToFile { .. }
| Error::Unexpected { .. } => StatusCode::Unexpected,
Error::Catalog { source, .. } => source.status_code(),
Error::CatalogEntrySerde { source, .. } => source.status_code(),

View File

@@ -23,11 +23,11 @@ use api::v1::ddl_request::{Expr as DdlExpr, Expr};
use api::v1::greptime_request::Request;
use api::v1::{
column_def, AlterExpr, CompactTableExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest,
DropTableExpr, FlushTableExpr, InsertRequests, TableId,
FlushTableExpr, InsertRequests,
};
use async_trait::async_trait;
use catalog::helper::{SchemaKey, SchemaValue};
use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest};
use catalog::{CatalogManager, RegisterTableRequest};
use chrono::DateTime;
use client::client_manager::DatanodeClients;
use client::Database;
@@ -36,9 +36,7 @@ use common_catalog::format_full_table_name;
use common_error::prelude::BoxedError;
use common_meta::peer::Peer;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::{
DeleteRequest as MetaDeleteRequest, Partition as MetaPartition, RouteRequest,
};
use common_meta::rpc::router::{Partition as MetaPartition, RouteRequest};
use common_meta::rpc::store::CompareAndPutRequest;
use common_meta::table_name::TableName;
use common_query::Output;
@@ -61,7 +59,7 @@ use sql::statements::statement::Statement;
use sql::statements::{self, sql_value_to_value};
use store_api::storage::RegionNumber;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};
use table::requests::TableOptions;
use table::table::AlterContext;
use table::TableRef;
@@ -119,12 +117,15 @@ impl DistInstance {
.create_table_procedure(create_table, partitions, table_info.clone())
.await?;
let table_id = resp.table_id;
let table_id = resp.table_id.context(error::UnexpectedSnafu {
violated: "expected table_id",
})?;
info!("Successfully created distributed table '{table_name}' with table id {table_id}");
table_info.ident.table_id = table_id;
let table_info = Arc::new(table_info.try_into().context(error::CreateTableInfoSnafu)?);
create_table.table_id = Some(TableId { id: table_id });
create_table.table_id = Some(api::v1::TableId { id: table_id });
let table = Arc::new(DistTable::new(
table_name.clone(),
@@ -165,7 +166,7 @@ impl DistInstance {
}
async fn drop_table(&self, table_name: TableName) -> Result<Output> {
let _ = self
let table = self
.catalog_manager
.table(
&table_name.catalog_name,
@@ -178,42 +179,9 @@ impl DistInstance {
table_name: table_name.to_string(),
})?;
let route_response = self
.meta_client
.delete_route(MetaDeleteRequest {
table_name: table_name.clone(),
})
.await
.context(RequestMetaSnafu)?;
let table_id = table.table_info().ident.table_id;
let request = DeregisterTableRequest {
catalog: table_name.catalog_name.clone(),
schema: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
};
self.catalog_manager
.deregister_table(request)
.await
.context(CatalogSnafu)?;
let expr = DropTableExpr {
catalog_name: table_name.catalog_name.clone(),
schema_name: table_name.schema_name.clone(),
table_name: table_name.table_name.clone(),
..Default::default()
};
for table_route in route_response.table_routes.iter() {
for datanode in table_route.find_leaders() {
debug!("Dropping table {table_name} on Datanode {datanode:?}");
let client = self.datanode_clients.get_client(&datanode).await;
let client = Database::new(&expr.catalog_name, &expr.schema_name, client);
let _ = client
.drop_table(expr.clone())
.await
.context(RequestDatanodeSnafu)?;
}
}
self.drop_table_procedure(&table_name, table_id).await?;
// Since the table information dropped on meta does not go through KvBackend, so we
// manually invalidate the cache here.
@@ -547,6 +515,30 @@ impl DistInstance {
.context(error::RequestMetaSnafu)
}
async fn drop_table_procedure(
&self,
table_name: &TableName,
table_id: TableId,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
task: DdlTask::new_drop_table(
table_name.catalog_name.to_string(),
table_name.schema_name.to_string(),
table_name.table_name.to_string(),
table_id,
),
};
timeout(
// TODO(weny): makes timeout configurable.
Duration::from_secs(10),
self.meta_client.submit_ddl_task(request),
)
.await
.context(error::TimeoutSnafu)?
.context(error::RequestMetaSnafu)
}
async fn handle_dist_insert(
&self,
requests: InsertRequests,

View File

@@ -67,8 +67,7 @@ impl Client {
}
#[derive(Debug)]
// TODO(weny): removes this in following PRs.
#[allow(unused)]
struct Inner {
id: Id,
role: Role,

View File

@@ -83,6 +83,20 @@ impl DdlManager {
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: CreateTableProcedure::TYPE_NAME,
})?;
let context = self.create_context();
self.procedure_manager
.register_loader(
DropTableProcedure::TYPE_NAME,
Box::new(move |json| {
let context = context.clone();
DropTableProcedure::from_json(json, context).map(|p| Box::new(p) as _)
}),
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: DropTableProcedure::TYPE_NAME,
})
}

View File

@@ -47,8 +47,6 @@ pub struct DropTableProcedure {
data: DropTableData,
}
// TODO(weny): removes in following PRs.
#[allow(unused)]
impl DropTableProcedure {
pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";

View File

@@ -43,7 +43,7 @@ pub fn build_table_metadata_key(
table_id,
catalog_name: table_ref.catalog,
schema_name: table_ref.schema,
table_name: table_ref.schema,
table_name: table_ref.table,
};
let table_global_key = TableGlobalKey {