From 57836e762b307b2cf0f87318e8da0392803540bc Mon Sep 17 00:00:00 2001 From: Vanish Date: Tue, 8 Aug 2023 19:23:36 +0800 Subject: [PATCH] feat: truncate table in standalone mode (#2090) * feat: impl table procedure in standalone mode * chore: remove useless changes * test: add some tests * Update src/table-procedure/src/truncate.rs Co-authored-by: Yingwen * CR * Update src/datanode/src/sql/truncate_table.rs Co-authored-by: Yingwen * chore: fmt --------- Co-authored-by: Yingwen --- src/datanode/src/sql/truncate_table.rs | 55 +++- src/file-table-engine/src/engine/immutable.rs | 16 +- src/file-table-engine/src/engine/tests.rs | 30 +- src/mito/src/engine.rs | 15 + src/mito/src/engine/procedure.rs | 5 +- src/mito/src/engine/procedure/drop.rs | 2 +- src/mito/src/engine/procedure/truncate.rs | 201 ++++++++++++ src/storage/src/region/tests/truncate.rs | 2 +- src/table-procedure/src/drop.rs | 23 ++ src/table-procedure/src/lib.rs | 9 +- src/table-procedure/src/truncate.rs | 298 ++++++++++++++++++ src/table/src/engine.rs | 7 + src/table/src/test_util/mock_engine.rs | 10 +- 13 files changed, 663 insertions(+), 10 deletions(-) create mode 100644 src/mito/src/engine/procedure/truncate.rs create mode 100644 src/table-procedure/src/truncate.rs diff --git a/src/datanode/src/sql/truncate_table.rs b/src/datanode/src/sql/truncate_table.rs index 45c859152f..7fe2b3ccbc 100644 --- a/src/datanode/src/sql/truncate_table.rs +++ b/src/datanode/src/sql/truncate_table.rs @@ -12,11 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_procedure::{watcher, ProcedureWithId}; use common_query::Output; +use common_telemetry::logging::info; +use snafu::ResultExt; use table::engine::TableReference; use table::requests::TruncateTableRequest; +use table_procedure::TruncateTableProcedure; -use crate::error::Result; +use crate::error::{self, Result}; use crate::sql::SqlHandler; impl SqlHandler { @@ -28,8 +32,32 @@ impl SqlHandler { table: &table_name, }; - let _table = self.get_table(&table_ref).await?; - // TODO(DevilExileSu): implement truncate table-procedure. + let table = self.get_table(&table_ref).await?; + let engine_procedure = self.engine_procedure(table)?; + + let procedure = TruncateTableProcedure::new( + req.clone(), + self.catalog_manager.clone(), + engine_procedure, + ); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let procedure_id = procedure_with_id.id; + + info!( + "Truncate table {}, table_id {} by procedure {}", + table_ref, req.table_id, procedure_id + ); + + let mut watcher = self + .procedure_manager + .submit(procedure_with_id) + .await + .context(error::SubmitProcedureSnafu { procedure_id })?; + + watcher::wait(&mut watcher) + .await + .context(error::WaitProcedureSnafu { procedure_id })?; Ok(Output::AffectedRows(0)) } } @@ -39,6 +67,7 @@ mod tests { use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::QueryRequest; + use common_recordbatch::RecordBatches; use datatypes::prelude::ConcreteDataType; use query::parser::{QueryLanguageParser, QueryStatement}; use query::query_engine::SqlStatementExecutor; @@ -50,6 +79,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_truncate_table_by_procedure() { + common_telemetry::init_default_ut_logging(); let instance = MockInstance::new("truncate_table_by_procedure").await; // Create table first. @@ -90,5 +120,24 @@ mod tests { .await .unwrap(); assert!(matches!(output, Output::AffectedRows(0))); + + // Verify table is empty. + let query = Request::Query(QueryRequest { + query: Some(Query::Sql("SELECT * FROM demo".to_string())), + }); + + let output = instance + .inner() + .do_query(query, QueryContext::arc()) + .await + .unwrap(); + if let Output::Stream(stream) = output { + let output = RecordBatches::try_collect(stream) + .await + .unwrap() + .pretty_print() + .unwrap(); + assert_eq!("++\n++", output) + } } } diff --git a/src/file-table-engine/src/engine/immutable.rs b/src/file-table-engine/src/engine/immutable.rs index f3e97854ca..d0ed345e9e 100644 --- a/src/file-table-engine/src/engine/immutable.rs +++ b/src/file-table-engine/src/engine/immutable.rs @@ -119,7 +119,10 @@ impl TableEngine for ImmutableFileTableEngine { _ctx: &EngineContext, _request: TruncateTableRequest, ) -> TableResult { - Ok(true) + table_error::UnsupportedSnafu { + operation: "TRUNCATE TABLE", + } + .fail() } } @@ -153,6 +156,17 @@ impl TableEngineProcedure for ImmutableFileTableEngine { let procedure = Box::new(DropImmutableFileTable::new(request, self.clone())); Ok(procedure) } + + fn truncate_table_procedure( + &self, + _ctx: &EngineContext, + _request: TruncateTableRequest, + ) -> TableResult { + table_error::UnsupportedSnafu { + operation: "TRUNCATE TABLE", + } + .fail() + } } #[cfg(test)] diff --git a/src/file-table-engine/src/engine/tests.rs b/src/file-table-engine/src/engine/tests.rs index e32b69a337..f2d5dcf543 100644 --- a/src/file-table-engine/src/engine/tests.rs +++ b/src/file-table-engine/src/engine/tests.rs @@ -17,7 +17,9 @@ use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, IMMUTABLE_FILE_ENGINE}; use table::engine::{EngineContext, TableEngine, TableEngineProcedure}; -use table::requests::{AlterKind, AlterTableRequest, DropTableRequest, OpenTableRequest}; +use table::requests::{ + AlterKind, AlterTableRequest, DropTableRequest, OpenTableRequest, TruncateTableRequest, +}; use table::{error as table_error, Table}; use crate::config::EngineConfig; @@ -216,3 +218,29 @@ async fn test_create_drop_table_procedure() { .unwrap() .is_none()); } + +#[tokio::test] +async fn test_truncate_table() { + common_telemetry::init_default_ut_logging(); + let TestEngineComponents { + table_engine, + dir: _dir, + table_ref, + .. + } = test_util::setup_test_engine_and_table("test_truncate_table").await; + + let truncate_req = TruncateTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: TEST_TABLE_NAME.to_string(), + table_id: table_ref.table_info().ident.table_id, + }; + + let unsupported = table_engine + .truncate_table(&EngineContext::default(), truncate_req) + .await + .err() + .unwrap(); + + assert_matches!(unsupported, table_error::Error::Unsupported { .. }) +} diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index a4f8a27b48..3f45df8afd 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -47,6 +47,7 @@ use table::requests::{ }; use table::{error as table_error, Result as TableResult, Table, TableRef}; +use self::procedure::TruncateMitoTable; use crate::config::EngineConfig; use crate::engine::procedure::{AlterMitoTable, CreateMitoTable, DropMitoTable, TableCreator}; use crate::error::{ @@ -238,6 +239,19 @@ impl TableEngineProcedure for MitoEngine { ); Ok(procedure) } + + fn truncate_table_procedure( + &self, + _ctx: &EngineContext, + request: TruncateTableRequest, + ) -> TableResult { + let procedure = Box::new( + TruncateMitoTable::new(request, self.inner.clone()) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?, + ); + Ok(procedure) + } } pub(crate) struct MitoEngineInner { @@ -731,6 +745,7 @@ impl MitoEngineInner { .await .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; + Ok(true) } else { Ok(false) diff --git a/src/mito/src/engine/procedure.rs b/src/mito/src/engine/procedure.rs index 1ffdfcc2a5..f62e3b19f1 100644 --- a/src/mito/src/engine/procedure.rs +++ b/src/mito/src/engine/procedure.rs @@ -15,6 +15,7 @@ mod alter; mod create; mod drop; +mod truncate; use std::sync::Arc; @@ -23,6 +24,7 @@ use common_procedure::ProcedureManager; pub(crate) use create::{CreateMitoTable, TableCreator}; pub(crate) use drop::DropMitoTable; use store_api::storage::StorageEngine; +pub(crate) use truncate::TruncateMitoTable; use crate::engine::MitoEngineInner; @@ -37,7 +39,8 @@ pub(crate) fn register_procedure_loaders( // The procedure names are expected to be unique, so we just panic on error. CreateMitoTable::register_loader(engine_inner.clone(), procedure_manager); AlterMitoTable::register_loader(engine_inner.clone(), procedure_manager); - DropMitoTable::register_loader(engine_inner, procedure_manager); + DropMitoTable::register_loader(engine_inner.clone(), procedure_manager); + TruncateMitoTable::register_loader(engine_inner, procedure_manager) } #[cfg(test)] diff --git a/src/mito/src/engine/procedure/drop.rs b/src/mito/src/engine/procedure/drop.rs index dd314484a8..7379c62be5 100644 --- a/src/mito/src/engine/procedure/drop.rs +++ b/src/mito/src/engine/procedure/drop.rs @@ -171,7 +171,7 @@ mod tests { let TestEnv { table_engine, dir: _dir, - } = procedure_test_util::setup_test_engine("add_column").await; + } = procedure_test_util::setup_test_engine("drop_table").await; let schema = Arc::new(test_util::schema_for_test()); let request = test_util::new_create_request(schema.clone()); diff --git a/src/mito/src/engine/procedure/truncate.rs b/src/mito/src/engine/procedure/truncate.rs new file mode 100644 index 0000000000..71c53dfd85 --- /dev/null +++ b/src/mito/src/engine/procedure/truncate.rs @@ -0,0 +1,201 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use common_procedure::error::{Error, FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, Status}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::storage::StorageEngine; +use table::engine::TableReference; +use table::requests::TruncateTableRequest; +use table::Table; + +use crate::engine::MitoEngineInner; +use crate::table::MitoTable; + +/// Procedure to truncate a [MitoTable]. +pub(crate) struct TruncateMitoTable { + data: TruncateTableData, + engine_inner: Arc>, + table: Option>>, +} + +#[async_trait] +impl Procedure for TruncateMitoTable { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &Context) -> Result { + match self.data.state { + TruncateTableState::Prepare => self.on_prepare(), + TruncateTableState::EngineTruncateTable => self.on_engine_truncate_table().await, + } + } + + fn dump(&self) -> Result { + let json = serde_json::to_string(&self.data).context(ToJsonSnafu)?; + Ok(json) + } + + fn lock_key(&self) -> LockKey { + let table_ref = self.data.table_ref(); + let Some(table) = &self.table else { + return LockKey::default(); + }; + let info = table.table_info(); + let keys = info + .meta + .region_numbers + .iter() + .map(|number| format!("{table_ref}/region-{number}")); + LockKey::new(keys) + } +} + +impl TruncateMitoTable { + const TYPE_NAME: &str = "mito::TruncateMitoTable"; + + /// Returns a new [TruncateMitoTable]. + pub(crate) fn new( + request: TruncateTableRequest, + engine_inner: Arc>, + ) -> Result { + let data = TruncateTableData { + state: TruncateTableState::Prepare, + request, + }; + let table = engine_inner.get_mito_table(data.request.table_id); + + Ok(TruncateMitoTable { + data, + engine_inner, + table, + }) + } + + /// Register the loader of this procedure to the `procedure_manager`. + /// + /// # Panics + /// Panics on error. + pub(crate) fn register_loader( + engine_inner: Arc>, + procedure_manager: &dyn ProcedureManager, + ) { + procedure_manager + .register_loader( + Self::TYPE_NAME, + Box::new(move |data| { + Self::from_json(data, engine_inner.clone()).map(|p| Box::new(p) as _) + }), + ) + .unwrap() + } + + /// Recover the procedure from json. + fn from_json(json: &str, engine_inner: Arc>) -> Result { + let data: TruncateTableData = serde_json::from_str(json).context(FromJsonSnafu)?; + let table = engine_inner.get_mito_table(data.request.table_id); + + Ok(TruncateMitoTable { + data, + engine_inner, + table, + }) + } + + /// Prepare table info. + fn on_prepare(&mut self) -> Result { + self.data.state = TruncateTableState::EngineTruncateTable; + + Ok(Status::executing(true)) + } + + async fn on_engine_truncate_table(&mut self) -> Result { + let engine = &self.engine_inner; + engine + .truncate_table(self.data.request.clone()) + .await + .map_err(Error::from_error_ext)?; + Ok(Status::Done) + } +} + +/// Represents each step while truncating table in the mito engine. +#[derive(Debug, Serialize, Deserialize)] +enum TruncateTableState { + /// Prepare to truncate the table. + Prepare, + /// Engine truncate the table. + EngineTruncateTable, +} + +/// Serializable data of [TruncateMitoTable]. +#[derive(Debug, Serialize, Deserialize)] +struct TruncateTableData { + state: TruncateTableState, + request: TruncateTableRequest, +} + +impl TruncateTableData { + fn table_ref(&self) -> TableReference { + self.request.table_ref() + } +} + +#[cfg(test)] +mod tests { + use table::engine::{EngineContext, TableEngine, TableEngineProcedure}; + + use super::*; + use crate::engine::procedure::procedure_test_util::{self, TestEnv}; + use crate::table::test_util; + + #[tokio::test] + async fn test_procedure_truncate_table() { + common_telemetry::init_default_ut_logging(); + + let TestEnv { + table_engine, + dir: _dir, + } = procedure_test_util::setup_test_engine("truncate_table").await; + let schema = Arc::new(test_util::schema_for_test()); + let request = test_util::new_create_request(schema.clone()); + + let engine_ctx = EngineContext::default(); + // Create table first. + let mut procedure = table_engine + .create_table_procedure(&engine_ctx, request.clone()) + .unwrap(); + procedure_test_util::execute_procedure_until_done(&mut procedure).await; + + let table_id = request.id; + + let request = test_util::new_truncate_request(); + + // Truncate the table. + let mut procedure = table_engine + .truncate_table_procedure(&engine_ctx, request.clone()) + .unwrap(); + procedure_test_util::execute_procedure_until_done(&mut procedure).await; + + assert!(table_engine + .get_table(&engine_ctx, table_id) + .unwrap() + .is_some()); + } +} diff --git a/src/storage/src/region/tests/truncate.rs b/src/storage/src/region/tests/truncate.rs index 100e990c4d..71d2da5bc6 100644 --- a/src/storage/src/region/tests/truncate.rs +++ b/src/storage/src/region/tests/truncate.rs @@ -222,7 +222,7 @@ async fn test_truncate_reopen() { // Persist the meta action. let prev_version = manifest_version; action_list.set_prev_version(prev_version); - assert!(manifest.update(action_list).await.is_ok()); + manifest.update(action_list).await.unwrap(); // Reopen and put data. tester.reopen().await; diff --git a/src/table-procedure/src/drop.rs b/src/table-procedure/src/drop.rs index 82062c826a..9db9f37723 100644 --- a/src/table-procedure/src/drop.rs +++ b/src/table-procedure/src/drop.rs @@ -303,4 +303,27 @@ mod tests { let ctx = EngineContext::default(); assert!(!table_engine.table_exists(&ctx, table_id,)); } + + #[tokio::test] + async fn test_drop_not_exists_table() { + common_telemetry::init_default_ut_logging(); + let TestEnv { + dir: _, + table_engine, + procedure_manager: _, + catalog_manager, + } = TestEnv::new("drop"); + let table_name = "test_drop"; + + let request = DropTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + table_id: 0, + }; + + let mut procedure = + DropTableProcedure::new(request, catalog_manager.clone(), table_engine.clone()); + assert!(procedure.on_prepare().await.is_err()); + } } diff --git a/src/table-procedure/src/lib.rs b/src/table-procedure/src/lib.rs index 0778a3fac2..0963adb474 100644 --- a/src/table-procedure/src/lib.rs +++ b/src/table-procedure/src/lib.rs @@ -18,6 +18,7 @@ mod alter; mod create; mod drop; pub mod error; +mod truncate; pub use alter::AlterTableProcedure; use catalog::CatalogManagerRef; @@ -25,6 +26,7 @@ use common_procedure::ProcedureManager; pub use create::CreateTableProcedure; pub use drop::DropTableProcedure; use table::engine::{TableEngineProcedureRef, TableEngineRef}; +pub use truncate::TruncateTableProcedure; /// Register all procedure loaders to the procedure manager. /// @@ -48,7 +50,12 @@ pub fn register_procedure_loaders( engine_procedure.clone(), procedure_manager, ); - DropTableProcedure::register_loader(catalog_manager, engine_procedure, procedure_manager); + DropTableProcedure::register_loader( + catalog_manager.clone(), + engine_procedure.clone(), + procedure_manager, + ); + TruncateTableProcedure::register_loader(catalog_manager, engine_procedure, procedure_manager) } #[cfg(test)] diff --git a/src/table-procedure/src/truncate.rs b/src/table-procedure/src/truncate.rs new file mode 100644 index 0000000000..868b077b7d --- /dev/null +++ b/src/table-procedure/src/truncate.rs @@ -0,0 +1,298 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Procedure to truncate a table. + +use async_trait::async_trait; +use catalog::CatalogManagerRef; +use common_procedure::error::SubprocedureFailedSnafu; +use common_procedure::{ + Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState, + ProcedureWithId, Result, Status, +}; +use common_telemetry::logging; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use table::engine::{EngineContext, TableEngineProcedureRef, TableReference}; +use table::requests::TruncateTableRequest; + +use crate::error::{ + AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableNotFoundSnafu, +}; + +/// Procedure to truncate a table. +#[allow(dead_code)] +pub struct TruncateTableProcedure { + data: TruncateTableData, + catalog_manager: CatalogManagerRef, + engine_procedure: TableEngineProcedureRef, +} + +#[async_trait] +impl Procedure for TruncateTableProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, ctx: &Context) -> Result { + match self.data.state { + TruncateTableState::Prepare => self.on_prepare().await, + TruncateTableState::EngineTruncateTable => self.on_engine_truncate_table(ctx).await, + } + } + + fn dump(&self) -> Result { + let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?; + Ok(json) + } + + fn lock_key(&self) -> LockKey { + // We lock the whole table. + let table_name = self.data.table_ref().to_string(); + LockKey::single(table_name) + } +} + +impl TruncateTableProcedure { + const TYPE_NAME: &str = "table-procedure::TruncateTableProcedure"; + + /// Returns a new [TruncateTableProcedure]. + pub fn new( + request: TruncateTableRequest, + catalog_manager: CatalogManagerRef, + engine_procedure: TableEngineProcedureRef, + ) -> TruncateTableProcedure { + TruncateTableProcedure { + data: TruncateTableData { + state: TruncateTableState::Prepare, + request, + subprocedure_id: None, + }, + catalog_manager, + engine_procedure, + } + } + + /// Register the loader of this procedure to the `procedure_manager`. + /// + /// # Panics + /// Panics on error. + pub fn register_loader( + catalog_manager: CatalogManagerRef, + engine_procedure: TableEngineProcedureRef, + procedure_manager: &dyn ProcedureManager, + ) { + procedure_manager + .register_loader( + Self::TYPE_NAME, + Box::new(move |data| { + Self::from_json(data, catalog_manager.clone(), engine_procedure.clone()) + .map(|p| Box::new(p) as _) + }), + ) + .unwrap() + } + + /// Recover the procedure from json. + fn from_json( + json: &str, + catalog_manager: CatalogManagerRef, + engine_procedure: TableEngineProcedureRef, + ) -> Result { + let data: TruncateTableData = + serde_json::from_str(json).context(DeserializeProcedureSnafu)?; + + Ok(TruncateTableProcedure { + data, + catalog_manager, + engine_procedure, + }) + } + + async fn on_prepare(&mut self) -> Result { + let request = &self.data.request; + // Ensure the table exists. + let table_exists = self + .catalog_manager + .table_exist( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ) + .await + .context(AccessCatalogSnafu)?; + ensure!( + table_exists, + TableNotFoundSnafu { + name: &request.table_name, + } + ); + + self.data.state = TruncateTableState::EngineTruncateTable; + // Assign procedure id to the subprocedure. + self.data.subprocedure_id = Some(ProcedureId::random()); + + Ok(Status::executing(true)) + } + + async fn on_engine_truncate_table(&mut self, ctx: &Context) -> Result { + // Safety: subprocedure id is always set in this state. + let sub_id = self.data.subprocedure_id.unwrap(); + + // Query subprocedure state. + let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else { + logging::info!( + "On engine truncate table {}, subprocedure not found, sub_id: {}", + self.data.request.table_name, + sub_id, + ); + // If the subprocedure is not found, we create a new subprocedure with the same id. + let engine_ctx = EngineContext::default(); + + let procedure = self + .engine_procedure + .truncate_table_procedure(&engine_ctx, self.data.request.clone()) + .map_err(Error::from_error_ext)?; + + return Ok(Status::Suspended { + subprocedures: vec![ProcedureWithId { + id: sub_id, + procedure, + }], + persist: true, + }); + }; + + match sub_state { + ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended { + subprocedures: Vec::new(), + persist: false, + }), + ProcedureState::Done => { + logging::info!( + "On engine truncate table {}, done, sub_id: {}", + self.data.request.table_name, + sub_id + ); + + Ok(Status::Done) + } + ProcedureState::Failed { error } => { + // Return error if the subprocedure is failed. + Err(error).context(SubprocedureFailedSnafu { + subprocedure_id: sub_id, + })? + } + } + } +} + +/// Represents each step while truncating a table in the datanode. +#[derive(Debug, Serialize, Deserialize)] +enum TruncateTableState { + /// Validate request and prepare to drop table. + Prepare, + /// Truncate table in the table engine. + EngineTruncateTable, +} + +/// Serializable data of [TruncateTableProcedure]. +#[derive(Debug, Serialize, Deserialize)] +struct TruncateTableData { + /// Current state. + state: TruncateTableState, + /// Request to truncate this table. + request: TruncateTableRequest, + /// Id of the subprocedure to truncate this table from the engine. + /// + /// This id is `Some` while the procedure is in [TruncateTableState::EngineTruncateTable] + /// state. + subprocedure_id: Option, +} + +impl TruncateTableData { + fn table_ref(&self) -> TableReference { + TableReference { + catalog: &self.request.catalog_name, + schema: &self.request.schema_name, + table: &self.request.table_name, + } + } +} + +#[cfg(test)] +mod tests { + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use table::engine::TableEngine; + + use super::*; + use crate::test_util::TestEnv; + + #[tokio::test] + async fn test_truncate_table_procedure() { + let env = TestEnv::new("truncate"); + let table_name = "test_truncate"; + let table_id = env.create_table(table_name).await; + + let request = TruncateTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + table_id, + }; + let TestEnv { + dir: _dir, + table_engine, + procedure_manager, + catalog_manager, + } = env; + let procedure = + TruncateTableProcedure::new(request, catalog_manager.clone(), table_engine.clone()); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap(); + watcher.changed().await.unwrap(); + + assert!(catalog_manager + .table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) + .await + .unwrap()); + + let ctx = EngineContext::default(); + assert!(table_engine.table_exists(&ctx, table_id)); + } + + #[tokio::test] + async fn test_truncate_not_exists_table() { + common_telemetry::init_default_ut_logging(); + let TestEnv { + dir: _, + table_engine, + procedure_manager: _, + catalog_manager, + } = TestEnv::new("truncate"); + let table_name = "test_truncate"; + + let request = TruncateTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + table_id: 0, + }; + + let mut procedure = + TruncateTableProcedure::new(request, catalog_manager.clone(), table_engine.clone()); + assert!(procedure.on_prepare().await.is_err()); + } +} diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 2121aa32b7..d26dc0b768 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -169,6 +169,13 @@ pub trait TableEngineProcedure: Send + Sync { ctx: &EngineContext, request: DropTableRequest, ) -> Result; + + /// Returns a procedure that truncates a table by specific `request`. + fn truncate_table_procedure( + &self, + ctx: &EngineContext, + request: TruncateTableRequest, + ) -> Result; } pub type TableEngineProcedureRef = Arc; diff --git a/src/table/src/test_util/mock_engine.rs b/src/table/src/test_util/mock_engine.rs index d6be35d8ec..f6eba2adb8 100644 --- a/src/table/src/test_util/mock_engine.rs +++ b/src/table/src/test_util/mock_engine.rs @@ -111,7 +111,7 @@ impl TableEngine for MockTableEngine { _ctx: &EngineContext, _request: TruncateTableRequest, ) -> Result { - Ok(true) + unimplemented!() } } @@ -139,4 +139,12 @@ impl TableEngineProcedure for MockTableEngine { ) -> Result { unimplemented!() } + + fn truncate_table_procedure( + &self, + _ctx: &EngineContext, + _request: TruncateTableRequest, + ) -> Result { + unimplemented!() + } }