feat: Add drop table procedure to mito (#1377)

* feat: Add drop table procedure to mito

* feat: remove table from engine and then close it
This commit is contained in:
Yingwen
2023-04-14 13:09:38 +08:00
committed by GitHub
parent 0fc816fb0c
commit d988b43996
7 changed files with 274 additions and 15 deletions

View File

@@ -48,7 +48,7 @@ use table::{error as table_error, Result as TableResult, Table};
use tokio::sync::Mutex;
use crate::config::EngineConfig;
use crate::engine::procedure::{AlterMitoTable, CreateMitoTable};
use crate::engine::procedure::{AlterMitoTable, CreateMitoTable, DropMitoTable};
use crate::error::{
self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu,
BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, InvalidRawSchemaSnafu,
@@ -185,6 +185,19 @@ impl<S: StorageEngine> TableEngineProcedure for MitoEngine<S> {
);
Ok(procedure)
}
fn drop_table_procedure(
&self,
_ctx: &EngineContext,
request: DropTableRequest,
) -> TableResult<BoxedProcedure> {
let procedure = Box::new(
DropMitoTable::new(request, self.inner.clone())
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?,
);
Ok(procedure)
}
}
pub(crate) struct MitoEngineInner<S: StorageEngine> {

View File

@@ -14,12 +14,14 @@
mod alter;
mod create;
mod drop;
use std::sync::Arc;
pub(crate) use alter::AlterMitoTable;
use common_procedure::ProcedureManager;
pub(crate) use create::CreateMitoTable;
pub(crate) use drop::DropMitoTable;
use store_api::storage::StorageEngine;
use crate::engine::MitoEngineInner;
@@ -34,7 +36,8 @@ pub(crate) fn register_procedure_loaders<S: StorageEngine>(
) {
// The procedure names are expected to be unique, so we just panic on error.
CreateMitoTable::register_loader(engine_inner.clone(), procedure_manager);
AlterMitoTable::register_loader(engine_inner, procedure_manager);
AlterMitoTable::register_loader(engine_inner.clone(), procedure_manager);
DropMitoTable::register_loader(engine_inner, procedure_manager);
}
#[cfg(test)]

View File

@@ -371,7 +371,7 @@ mod tests {
let TestEnv {
table_engine,
dir: _dir,
} = procedure_test_util::setup_test_engine("create_procedure").await;
} = procedure_test_util::setup_test_engine("add_column").await;
let schema = Arc::new(test_util::schema_for_test());
let request = test_util::new_create_request(schema.clone());
@@ -426,7 +426,7 @@ mod tests {
let TestEnv {
table_engine,
dir: _dir,
} = procedure_test_util::setup_test_engine("create_procedure").await;
} = procedure_test_util::setup_test_engine("drop_column").await;
let schema = Arc::new(test_util::schema_for_test());
let request = test_util::new_create_request(schema.clone());
@@ -491,7 +491,7 @@ mod tests {
let TestEnv {
table_engine,
dir: _dir,
} = procedure_test_util::setup_test_engine("create_procedure").await;
} = procedure_test_util::setup_test_engine("rename").await;
let schema = Arc::new(test_util::schema_for_test());
let create_request = test_util::new_create_request(schema.clone());

View File

@@ -0,0 +1,228 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use common_procedure::error::{Error, FromJsonSnafu, ToJsonSnafu};
use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, Status};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::StorageEngine;
use table::engine::TableReference;
use table::requests::DropTableRequest;
use table::Table;
use crate::engine::MitoEngineInner;
use crate::error::TableNotFoundSnafu;
use crate::table::MitoTable;
/// Procedure to drop a [MitoTable].
pub(crate) struct DropMitoTable<S: StorageEngine> {
data: DropTableData,
engine_inner: Arc<MitoEngineInner<S>>,
table: Arc<MitoTable<S::Region>>,
}
#[async_trait]
impl<S: StorageEngine> Procedure for DropMitoTable<S> {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
match self.data.state {
DropTableState::Prepare => self.on_prepare(),
DropTableState::CloseRegions => self.on_close_regions().await,
}
}
fn dump(&self) -> Result<String> {
let json = serde_json::to_string(&self.data).context(ToJsonSnafu)?;
Ok(json)
}
fn lock_key(&self) -> LockKey {
let table_ref = self.data.table_ref();
let info = self.table.table_info();
let keys = info
.meta
.region_numbers
.iter()
.map(|number| format!("{table_ref}/region-{number}"));
LockKey::new(keys)
}
}
impl<S: StorageEngine> DropMitoTable<S> {
const TYPE_NAME: &str = "mito::DropMitoTable";
/// Returns a new [DropMitoTable].
pub(crate) fn new(
request: DropTableRequest,
engine_inner: Arc<MitoEngineInner<S>>,
) -> Result<Self> {
let data = DropTableData {
state: DropTableState::Prepare,
request,
};
let table_ref = data.table_ref();
let table =
engine_inner
.get_mito_table(&table_ref)
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
Ok(DropMitoTable {
data,
engine_inner,
table,
})
}
/// Register the loader of this procedure to the `procedure_manager`.
///
/// # Panics
/// Panics on error.
pub(crate) fn register_loader(
engine_inner: Arc<MitoEngineInner<S>>,
procedure_manager: &dyn ProcedureManager,
) {
procedure_manager
.register_loader(
Self::TYPE_NAME,
Box::new(move |data| {
Self::from_json(data, engine_inner.clone()).map(|p| Box::new(p) as _)
}),
)
.unwrap()
}
/// Recover the procedure from json.
fn from_json(json: &str, engine_inner: Arc<MitoEngineInner<S>>) -> Result<Self> {
let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let table_ref = data.table_ref();
let table =
engine_inner
.get_mito_table(&table_ref)
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
Ok(DropMitoTable {
data,
engine_inner,
table,
})
}
/// Prepare table info.
fn on_prepare(&mut self) -> Result<Status> {
self.data.state = DropTableState::CloseRegions;
Ok(Status::executing(true))
}
/// Close all regions.
async fn on_close_regions(&mut self) -> Result<Status> {
// Remove the table from the engine to avoid further access from users.
let table_ref = self.data.table_ref();
self.engine_inner
.tables
.write()
.unwrap()
.remove(&table_ref.to_string());
// Close the table to close all regions. Closing a region is idempotent.
self.table.close().await.map_err(Error::from_error_ext)?;
// TODO(yingwen): Currently, DROP TABLE doesn't remove data. We can
// write a drop meta update to the table and remove all files in the
// background.
Ok(Status::Done)
}
}
/// Represents each step while dropping table in the mito engine.
#[derive(Debug, Serialize, Deserialize)]
enum DropTableState {
/// Prepare to drop table.
Prepare,
/// Close regions of this table.
CloseRegions,
}
/// Serializable data of [DropMitoTable].
#[derive(Debug, Serialize, Deserialize)]
struct DropTableData {
state: DropTableState,
request: DropTableRequest,
}
impl DropTableData {
fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.request.catalog_name,
schema: &self.request.schema_name,
table: &self.request.table_name,
}
}
}
#[cfg(test)]
mod tests {
use table::engine::{EngineContext, TableEngine, TableEngineProcedure};
use super::*;
use crate::engine::procedure::procedure_test_util::{self, TestEnv};
use crate::table::test_util;
#[tokio::test]
async fn test_procedure_drop_table() {
common_telemetry::init_default_ut_logging();
let TestEnv {
table_engine,
dir: _dir,
} = procedure_test_util::setup_test_engine("add_column").await;
let schema = Arc::new(test_util::schema_for_test());
let request = test_util::new_create_request(schema.clone());
let engine_ctx = EngineContext::default();
// Create table first.
let mut procedure = table_engine
.create_table_procedure(&engine_ctx, request.clone())
.unwrap();
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
// Drop the table.
let request = test_util::new_drop_request();
let mut procedure = table_engine
.drop_table_procedure(&engine_ctx, request.clone())
.unwrap();
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
// The table is dropped.
let table_ref = TableReference {
catalog: &request.catalog_name,
schema: &request.schema_name,
table: &request.table_name,
};
assert!(table_engine
.get_table(&engine_ctx, &table_ref)
.unwrap()
.is_none());
}
}

View File

@@ -30,7 +30,7 @@ use storage::EngineImpl;
use table::engine::{EngineContext, TableEngine};
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
use table::requests::{
AlterKind, AlterTableRequest, CreateTableRequest, InsertRequest, TableOptions,
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, InsertRequest, TableOptions,
};
use table::{Table, TableRef};
@@ -91,8 +91,8 @@ pub fn build_test_table_info() -> TableInfo {
.ident(0)
.table_version(0u64)
.table_type(TableType::Base)
.catalog_name("greptime".to_string())
.schema_name("public".to_string())
.catalog_name(DEFAULT_CATALOG_NAME.to_string())
.schema_name(DEFAULT_SCHEMA_NAME.to_string())
.build()
.unwrap()
}
@@ -108,8 +108,8 @@ pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
CreateTableRequest {
id: 1,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
desc: Some("a test table".to_string()),
schema: RawSchema::from(&*schema),
@@ -123,13 +123,21 @@ pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
pub fn new_alter_request(alter_kind: AlterKind) -> AlterTableRequest {
AlterTableRequest {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
alter_kind,
}
}
pub fn new_drop_request() -> DropTableRequest {
DropTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: TABLE_NAME.to_string(),
}
}
pub struct TestEngineComponents {
pub table_engine: MitoEngine<EngineImpl<NoopLogStore>>,
pub storage_engine: EngineImpl<NoopLogStore>,

View File

@@ -116,19 +116,26 @@ pub struct EngineContext {}
/// Procedures for table engine.
pub trait TableEngineProcedure: Send + Sync {
/// Returns a procedure that creates table by specific `request`.
/// Returns a procedure that creates a table by specific `request`.
fn create_table_procedure(
&self,
ctx: &EngineContext,
request: CreateTableRequest,
) -> Result<BoxedProcedure>;
/// Returns a procedure that alters table by specific `request`.
/// Returns a procedure that alters a table by specific `request`.
fn alter_table_procedure(
&self,
ctx: &EngineContext,
request: AlterTableRequest,
) -> Result<BoxedProcedure>;
/// Returns a procedure that drops a table by specific `request`.
fn drop_table_procedure(
&self,
ctx: &EngineContext,
request: DropTableRequest,
) -> Result<BoxedProcedure>;
}
pub type TableEngineProcedureRef = Arc<dyn TableEngineProcedure>;

View File

@@ -187,7 +187,7 @@ pub enum AlterKind {
}
/// Drop table request
#[derive(Debug)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DropTableRequest {
pub catalog_name: String,
pub schema_name: String,