From d988b43996f3ee57d97cec30382bc08369d0224c Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 14 Apr 2023 13:09:38 +0800 Subject: [PATCH] feat: Add drop table procedure to mito (#1377) * feat: Add drop table procedure to mito * feat: remove table from engine and then close it --- src/mito/src/engine.rs | 15 +- src/mito/src/engine/procedure.rs | 5 +- src/mito/src/engine/procedure/alter.rs | 6 +- src/mito/src/engine/procedure/drop.rs | 228 +++++++++++++++++++++++++ src/mito/src/table/test_util.rs | 22 ++- src/table/src/engine.rs | 11 +- src/table/src/requests.rs | 2 +- 7 files changed, 274 insertions(+), 15 deletions(-) create mode 100644 src/mito/src/engine/procedure/drop.rs diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 3fffcb2dc4..339a189ad3 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -48,7 +48,7 @@ use table::{error as table_error, Result as TableResult, Table}; use tokio::sync::Mutex; use crate::config::EngineConfig; -use crate::engine::procedure::{AlterMitoTable, CreateMitoTable}; +use crate::engine::procedure::{AlterMitoTable, CreateMitoTable, DropMitoTable}; use crate::error::{ self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu, BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, InvalidRawSchemaSnafu, @@ -185,6 +185,19 @@ impl TableEngineProcedure for MitoEngine { ); Ok(procedure) } + + fn drop_table_procedure( + &self, + _ctx: &EngineContext, + request: DropTableRequest, + ) -> TableResult { + let procedure = Box::new( + DropMitoTable::new(request, self.inner.clone()) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?, + ); + Ok(procedure) + } } pub(crate) struct MitoEngineInner { diff --git a/src/mito/src/engine/procedure.rs b/src/mito/src/engine/procedure.rs index 353c63e824..df8c55459d 100644 --- a/src/mito/src/engine/procedure.rs +++ b/src/mito/src/engine/procedure.rs @@ -14,12 +14,14 @@ mod alter; mod create; +mod drop; use std::sync::Arc; pub(crate) use alter::AlterMitoTable; use common_procedure::ProcedureManager; pub(crate) use create::CreateMitoTable; +pub(crate) use drop::DropMitoTable; use store_api::storage::StorageEngine; use crate::engine::MitoEngineInner; @@ -34,7 +36,8 @@ pub(crate) fn register_procedure_loaders( ) { // The procedure names are expected to be unique, so we just panic on error. CreateMitoTable::register_loader(engine_inner.clone(), procedure_manager); - AlterMitoTable::register_loader(engine_inner, procedure_manager); + AlterMitoTable::register_loader(engine_inner.clone(), procedure_manager); + DropMitoTable::register_loader(engine_inner, procedure_manager); } #[cfg(test)] diff --git a/src/mito/src/engine/procedure/alter.rs b/src/mito/src/engine/procedure/alter.rs index 6f935e8e21..1527d8c11b 100644 --- a/src/mito/src/engine/procedure/alter.rs +++ b/src/mito/src/engine/procedure/alter.rs @@ -371,7 +371,7 @@ mod tests { let TestEnv { table_engine, dir: _dir, - } = procedure_test_util::setup_test_engine("create_procedure").await; + } = procedure_test_util::setup_test_engine("add_column").await; let schema = Arc::new(test_util::schema_for_test()); let request = test_util::new_create_request(schema.clone()); @@ -426,7 +426,7 @@ mod tests { let TestEnv { table_engine, dir: _dir, - } = procedure_test_util::setup_test_engine("create_procedure").await; + } = procedure_test_util::setup_test_engine("drop_column").await; let schema = Arc::new(test_util::schema_for_test()); let request = test_util::new_create_request(schema.clone()); @@ -491,7 +491,7 @@ mod tests { let TestEnv { table_engine, dir: _dir, - } = procedure_test_util::setup_test_engine("create_procedure").await; + } = procedure_test_util::setup_test_engine("rename").await; let schema = Arc::new(test_util::schema_for_test()); let create_request = test_util::new_create_request(schema.clone()); diff --git a/src/mito/src/engine/procedure/drop.rs b/src/mito/src/engine/procedure/drop.rs new file mode 100644 index 0000000000..2f92ce0098 --- /dev/null +++ b/src/mito/src/engine/procedure/drop.rs @@ -0,0 +1,228 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use common_procedure::error::{Error, FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, Status}; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::StorageEngine; +use table::engine::TableReference; +use table::requests::DropTableRequest; +use table::Table; + +use crate::engine::MitoEngineInner; +use crate::error::TableNotFoundSnafu; +use crate::table::MitoTable; + +/// Procedure to drop a [MitoTable]. +pub(crate) struct DropMitoTable { + data: DropTableData, + engine_inner: Arc>, + table: Arc>, +} + +#[async_trait] +impl Procedure for DropMitoTable { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &Context) -> Result { + match self.data.state { + DropTableState::Prepare => self.on_prepare(), + DropTableState::CloseRegions => self.on_close_regions().await, + } + } + + fn dump(&self) -> Result { + let json = serde_json::to_string(&self.data).context(ToJsonSnafu)?; + Ok(json) + } + + fn lock_key(&self) -> LockKey { + let table_ref = self.data.table_ref(); + let info = self.table.table_info(); + let keys = info + .meta + .region_numbers + .iter() + .map(|number| format!("{table_ref}/region-{number}")); + LockKey::new(keys) + } +} + +impl DropMitoTable { + const TYPE_NAME: &str = "mito::DropMitoTable"; + + /// Returns a new [DropMitoTable]. + pub(crate) fn new( + request: DropTableRequest, + engine_inner: Arc>, + ) -> Result { + let data = DropTableData { + state: DropTableState::Prepare, + request, + }; + let table_ref = data.table_ref(); + let table = + engine_inner + .get_mito_table(&table_ref) + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + + Ok(DropMitoTable { + data, + engine_inner, + table, + }) + } + + /// Register the loader of this procedure to the `procedure_manager`. + /// + /// # Panics + /// Panics on error. + pub(crate) fn register_loader( + engine_inner: Arc>, + procedure_manager: &dyn ProcedureManager, + ) { + procedure_manager + .register_loader( + Self::TYPE_NAME, + Box::new(move |data| { + Self::from_json(data, engine_inner.clone()).map(|p| Box::new(p) as _) + }), + ) + .unwrap() + } + + /// Recover the procedure from json. + fn from_json(json: &str, engine_inner: Arc>) -> Result { + let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?; + let table_ref = data.table_ref(); + let table = + engine_inner + .get_mito_table(&table_ref) + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + + Ok(DropMitoTable { + data, + engine_inner, + table, + }) + } + + /// Prepare table info. + fn on_prepare(&mut self) -> Result { + self.data.state = DropTableState::CloseRegions; + + Ok(Status::executing(true)) + } + + /// Close all regions. + async fn on_close_regions(&mut self) -> Result { + // Remove the table from the engine to avoid further access from users. + let table_ref = self.data.table_ref(); + self.engine_inner + .tables + .write() + .unwrap() + .remove(&table_ref.to_string()); + + // Close the table to close all regions. Closing a region is idempotent. + self.table.close().await.map_err(Error::from_error_ext)?; + + // TODO(yingwen): Currently, DROP TABLE doesn't remove data. We can + // write a drop meta update to the table and remove all files in the + // background. + Ok(Status::Done) + } +} + +/// Represents each step while dropping table in the mito engine. +#[derive(Debug, Serialize, Deserialize)] +enum DropTableState { + /// Prepare to drop table. + Prepare, + /// Close regions of this table. + CloseRegions, +} + +/// Serializable data of [DropMitoTable]. +#[derive(Debug, Serialize, Deserialize)] +struct DropTableData { + state: DropTableState, + request: DropTableRequest, +} + +impl DropTableData { + fn table_ref(&self) -> TableReference { + TableReference { + catalog: &self.request.catalog_name, + schema: &self.request.schema_name, + table: &self.request.table_name, + } + } +} + +#[cfg(test)] +mod tests { + use table::engine::{EngineContext, TableEngine, TableEngineProcedure}; + + use super::*; + use crate::engine::procedure::procedure_test_util::{self, TestEnv}; + use crate::table::test_util; + + #[tokio::test] + async fn test_procedure_drop_table() { + common_telemetry::init_default_ut_logging(); + + let TestEnv { + table_engine, + dir: _dir, + } = procedure_test_util::setup_test_engine("add_column").await; + let schema = Arc::new(test_util::schema_for_test()); + let request = test_util::new_create_request(schema.clone()); + + let engine_ctx = EngineContext::default(); + // Create table first. + let mut procedure = table_engine + .create_table_procedure(&engine_ctx, request.clone()) + .unwrap(); + procedure_test_util::execute_procedure_until_done(&mut procedure).await; + + // Drop the table. + let request = test_util::new_drop_request(); + let mut procedure = table_engine + .drop_table_procedure(&engine_ctx, request.clone()) + .unwrap(); + procedure_test_util::execute_procedure_until_done(&mut procedure).await; + + // The table is dropped. + let table_ref = TableReference { + catalog: &request.catalog_name, + schema: &request.schema_name, + table: &request.table_name, + }; + assert!(table_engine + .get_table(&engine_ctx, &table_ref) + .unwrap() + .is_none()); + } +} diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index ff908f3f0c..dcde46ba03 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -30,7 +30,7 @@ use storage::EngineImpl; use table::engine::{EngineContext, TableEngine}; use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::{ - AlterKind, AlterTableRequest, CreateTableRequest, InsertRequest, TableOptions, + AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, InsertRequest, TableOptions, }; use table::{Table, TableRef}; @@ -91,8 +91,8 @@ pub fn build_test_table_info() -> TableInfo { .ident(0) .table_version(0u64) .table_type(TableType::Base) - .catalog_name("greptime".to_string()) - .schema_name("public".to_string()) + .catalog_name(DEFAULT_CATALOG_NAME.to_string()) + .schema_name(DEFAULT_SCHEMA_NAME.to_string()) .build() .unwrap() } @@ -108,8 +108,8 @@ pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) { pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest { CreateTableRequest { id: 1, - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: TABLE_NAME.to_string(), desc: Some("a test table".to_string()), schema: RawSchema::from(&*schema), @@ -123,13 +123,21 @@ pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest { pub fn new_alter_request(alter_kind: AlterKind) -> AlterTableRequest { AlterTableRequest { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: TABLE_NAME.to_string(), alter_kind, } } +pub fn new_drop_request() -> DropTableRequest { + DropTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: TABLE_NAME.to_string(), + } +} + pub struct TestEngineComponents { pub table_engine: MitoEngine>, pub storage_engine: EngineImpl, diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index a240869e41..07fcaf5a4f 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -116,19 +116,26 @@ pub struct EngineContext {} /// Procedures for table engine. pub trait TableEngineProcedure: Send + Sync { - /// Returns a procedure that creates table by specific `request`. + /// Returns a procedure that creates a table by specific `request`. fn create_table_procedure( &self, ctx: &EngineContext, request: CreateTableRequest, ) -> Result; - /// Returns a procedure that alters table by specific `request`. + /// Returns a procedure that alters a table by specific `request`. fn alter_table_procedure( &self, ctx: &EngineContext, request: AlterTableRequest, ) -> Result; + + /// Returns a procedure that drops a table by specific `request`. + fn drop_table_procedure( + &self, + ctx: &EngineContext, + request: DropTableRequest, + ) -> Result; } pub type TableEngineProcedureRef = Arc; diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 18c92b7a04..0dd85ffe24 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -187,7 +187,7 @@ pub enum AlterKind { } /// Drop table request -#[derive(Debug)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct DropTableRequest { pub catalog_name: String, pub schema_name: String,