mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 22:32:55 +00:00
feat: truncate table in standalone mode (#2090)
* feat: impl table procedure in standalone mode * chore: remove useless changes * test: add some tests * Update src/table-procedure/src/truncate.rs Co-authored-by: Yingwen <realevenyag@gmail.com> * CR * Update src/datanode/src/sql/truncate_table.rs Co-authored-by: Yingwen <realevenyag@gmail.com> * chore: fmt --------- Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
@@ -12,11 +12,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_procedure::{watcher, ProcedureWithId};
|
||||
use common_query::Output;
|
||||
use common_telemetry::logging::info;
|
||||
use snafu::ResultExt;
|
||||
use table::engine::TableReference;
|
||||
use table::requests::TruncateTableRequest;
|
||||
use table_procedure::TruncateTableProcedure;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error::{self, Result};
|
||||
use crate::sql::SqlHandler;
|
||||
|
||||
impl SqlHandler {
|
||||
@@ -28,8 +32,32 @@ impl SqlHandler {
|
||||
table: &table_name,
|
||||
};
|
||||
|
||||
let _table = self.get_table(&table_ref).await?;
|
||||
// TODO(DevilExileSu): implement truncate table-procedure.
|
||||
let table = self.get_table(&table_ref).await?;
|
||||
let engine_procedure = self.engine_procedure(table)?;
|
||||
|
||||
let procedure = TruncateTableProcedure::new(
|
||||
req.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
engine_procedure,
|
||||
);
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
let procedure_id = procedure_with_id.id;
|
||||
|
||||
info!(
|
||||
"Truncate table {}, table_id {} by procedure {}",
|
||||
table_ref, req.table_id, procedure_id
|
||||
);
|
||||
|
||||
let mut watcher = self
|
||||
.procedure_manager
|
||||
.submit(procedure_with_id)
|
||||
.await
|
||||
.context(error::SubmitProcedureSnafu { procedure_id })?;
|
||||
|
||||
watcher::wait(&mut watcher)
|
||||
.await
|
||||
.context(error::WaitProcedureSnafu { procedure_id })?;
|
||||
Ok(Output::AffectedRows(0))
|
||||
}
|
||||
}
|
||||
@@ -39,6 +67,7 @@ mod tests {
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::query_request::Query;
|
||||
use api::v1::QueryRequest;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use query::parser::{QueryLanguageParser, QueryStatement};
|
||||
use query::query_engine::SqlStatementExecutor;
|
||||
@@ -50,6 +79,7 @@ mod tests {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_truncate_table_by_procedure() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let instance = MockInstance::new("truncate_table_by_procedure").await;
|
||||
|
||||
// Create table first.
|
||||
@@ -90,5 +120,24 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(matches!(output, Output::AffectedRows(0)));
|
||||
|
||||
// Verify table is empty.
|
||||
let query = Request::Query(QueryRequest {
|
||||
query: Some(Query::Sql("SELECT * FROM demo".to_string())),
|
||||
});
|
||||
|
||||
let output = instance
|
||||
.inner()
|
||||
.do_query(query, QueryContext::arc())
|
||||
.await
|
||||
.unwrap();
|
||||
if let Output::Stream(stream) = output {
|
||||
let output = RecordBatches::try_collect(stream)
|
||||
.await
|
||||
.unwrap()
|
||||
.pretty_print()
|
||||
.unwrap();
|
||||
assert_eq!("++\n++", output)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,7 +119,10 @@ impl TableEngine for ImmutableFileTableEngine {
|
||||
_ctx: &EngineContext,
|
||||
_request: TruncateTableRequest,
|
||||
) -> TableResult<bool> {
|
||||
Ok(true)
|
||||
table_error::UnsupportedSnafu {
|
||||
operation: "TRUNCATE TABLE",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,6 +156,17 @@ impl TableEngineProcedure for ImmutableFileTableEngine {
|
||||
let procedure = Box::new(DropImmutableFileTable::new(request, self.clone()));
|
||||
Ok(procedure)
|
||||
}
|
||||
|
||||
fn truncate_table_procedure(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
_request: TruncateTableRequest,
|
||||
) -> TableResult<BoxedProcedure> {
|
||||
table_error::UnsupportedSnafu {
|
||||
operation: "TRUNCATE TABLE",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -17,7 +17,9 @@ use std::sync::Arc;
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, IMMUTABLE_FILE_ENGINE};
|
||||
use table::engine::{EngineContext, TableEngine, TableEngineProcedure};
|
||||
use table::requests::{AlterKind, AlterTableRequest, DropTableRequest, OpenTableRequest};
|
||||
use table::requests::{
|
||||
AlterKind, AlterTableRequest, DropTableRequest, OpenTableRequest, TruncateTableRequest,
|
||||
};
|
||||
use table::{error as table_error, Table};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
@@ -216,3 +218,29 @@ async fn test_create_drop_table_procedure() {
|
||||
.unwrap()
|
||||
.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_truncate_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let TestEngineComponents {
|
||||
table_engine,
|
||||
dir: _dir,
|
||||
table_ref,
|
||||
..
|
||||
} = test_util::setup_test_engine_and_table("test_truncate_table").await;
|
||||
|
||||
let truncate_req = TruncateTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: TEST_TABLE_NAME.to_string(),
|
||||
table_id: table_ref.table_info().ident.table_id,
|
||||
};
|
||||
|
||||
let unsupported = table_engine
|
||||
.truncate_table(&EngineContext::default(), truncate_req)
|
||||
.await
|
||||
.err()
|
||||
.unwrap();
|
||||
|
||||
assert_matches!(unsupported, table_error::Error::Unsupported { .. })
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ use table::requests::{
|
||||
};
|
||||
use table::{error as table_error, Result as TableResult, Table, TableRef};
|
||||
|
||||
use self::procedure::TruncateMitoTable;
|
||||
use crate::config::EngineConfig;
|
||||
use crate::engine::procedure::{AlterMitoTable, CreateMitoTable, DropMitoTable, TableCreator};
|
||||
use crate::error::{
|
||||
@@ -238,6 +239,19 @@ impl<S: StorageEngine> TableEngineProcedure for MitoEngine<S> {
|
||||
);
|
||||
Ok(procedure)
|
||||
}
|
||||
|
||||
fn truncate_table_procedure(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
request: TruncateTableRequest,
|
||||
) -> TableResult<BoxedProcedure> {
|
||||
let procedure = Box::new(
|
||||
TruncateMitoTable::new(request, self.inner.clone())
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?,
|
||||
);
|
||||
Ok(procedure)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct MitoEngineInner<S: StorageEngine> {
|
||||
@@ -731,6 +745,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
mod alter;
|
||||
mod create;
|
||||
mod drop;
|
||||
mod truncate;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -23,6 +24,7 @@ use common_procedure::ProcedureManager;
|
||||
pub(crate) use create::{CreateMitoTable, TableCreator};
|
||||
pub(crate) use drop::DropMitoTable;
|
||||
use store_api::storage::StorageEngine;
|
||||
pub(crate) use truncate::TruncateMitoTable;
|
||||
|
||||
use crate::engine::MitoEngineInner;
|
||||
|
||||
@@ -37,7 +39,8 @@ pub(crate) fn register_procedure_loaders<S: StorageEngine>(
|
||||
// The procedure names are expected to be unique, so we just panic on error.
|
||||
CreateMitoTable::register_loader(engine_inner.clone(), procedure_manager);
|
||||
AlterMitoTable::register_loader(engine_inner.clone(), procedure_manager);
|
||||
DropMitoTable::register_loader(engine_inner, procedure_manager);
|
||||
DropMitoTable::register_loader(engine_inner.clone(), procedure_manager);
|
||||
TruncateMitoTable::register_loader(engine_inner, procedure_manager)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -171,7 +171,7 @@ mod tests {
|
||||
let TestEnv {
|
||||
table_engine,
|
||||
dir: _dir,
|
||||
} = procedure_test_util::setup_test_engine("add_column").await;
|
||||
} = procedure_test_util::setup_test_engine("drop_table").await;
|
||||
let schema = Arc::new(test_util::schema_for_test());
|
||||
let request = test_util::new_create_request(schema.clone());
|
||||
|
||||
|
||||
201
src/mito/src/engine/procedure/truncate.rs
Normal file
201
src/mito/src/engine/procedure/truncate.rs
Normal file
@@ -0,0 +1,201 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_procedure::error::{Error, FromJsonSnafu, ToJsonSnafu};
|
||||
use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, Status};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::StorageEngine;
|
||||
use table::engine::TableReference;
|
||||
use table::requests::TruncateTableRequest;
|
||||
use table::Table;
|
||||
|
||||
use crate::engine::MitoEngineInner;
|
||||
use crate::table::MitoTable;
|
||||
|
||||
/// Procedure to truncate a [MitoTable].
|
||||
pub(crate) struct TruncateMitoTable<S: StorageEngine> {
|
||||
data: TruncateTableData,
|
||||
engine_inner: Arc<MitoEngineInner<S>>,
|
||||
table: Option<Arc<MitoTable<S::Region>>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<S: StorageEngine> Procedure for TruncateMitoTable<S> {
|
||||
fn type_name(&self) -> &str {
|
||||
Self::TYPE_NAME
|
||||
}
|
||||
|
||||
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
|
||||
match self.data.state {
|
||||
TruncateTableState::Prepare => self.on_prepare(),
|
||||
TruncateTableState::EngineTruncateTable => self.on_engine_truncate_table().await,
|
||||
}
|
||||
}
|
||||
|
||||
fn dump(&self) -> Result<String> {
|
||||
let json = serde_json::to_string(&self.data).context(ToJsonSnafu)?;
|
||||
Ok(json)
|
||||
}
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
let table_ref = self.data.table_ref();
|
||||
let Some(table) = &self.table else {
|
||||
return LockKey::default();
|
||||
};
|
||||
let info = table.table_info();
|
||||
let keys = info
|
||||
.meta
|
||||
.region_numbers
|
||||
.iter()
|
||||
.map(|number| format!("{table_ref}/region-{number}"));
|
||||
LockKey::new(keys)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: StorageEngine> TruncateMitoTable<S> {
|
||||
const TYPE_NAME: &str = "mito::TruncateMitoTable";
|
||||
|
||||
/// Returns a new [TruncateMitoTable].
|
||||
pub(crate) fn new(
|
||||
request: TruncateTableRequest,
|
||||
engine_inner: Arc<MitoEngineInner<S>>,
|
||||
) -> Result<Self> {
|
||||
let data = TruncateTableData {
|
||||
state: TruncateTableState::Prepare,
|
||||
request,
|
||||
};
|
||||
let table = engine_inner.get_mito_table(data.request.table_id);
|
||||
|
||||
Ok(TruncateMitoTable {
|
||||
data,
|
||||
engine_inner,
|
||||
table,
|
||||
})
|
||||
}
|
||||
|
||||
/// Register the loader of this procedure to the `procedure_manager`.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics on error.
|
||||
pub(crate) fn register_loader(
|
||||
engine_inner: Arc<MitoEngineInner<S>>,
|
||||
procedure_manager: &dyn ProcedureManager,
|
||||
) {
|
||||
procedure_manager
|
||||
.register_loader(
|
||||
Self::TYPE_NAME,
|
||||
Box::new(move |data| {
|
||||
Self::from_json(data, engine_inner.clone()).map(|p| Box::new(p) as _)
|
||||
}),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Recover the procedure from json.
|
||||
fn from_json(json: &str, engine_inner: Arc<MitoEngineInner<S>>) -> Result<Self> {
|
||||
let data: TruncateTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
|
||||
let table = engine_inner.get_mito_table(data.request.table_id);
|
||||
|
||||
Ok(TruncateMitoTable {
|
||||
data,
|
||||
engine_inner,
|
||||
table,
|
||||
})
|
||||
}
|
||||
|
||||
/// Prepare table info.
|
||||
fn on_prepare(&mut self) -> Result<Status> {
|
||||
self.data.state = TruncateTableState::EngineTruncateTable;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
async fn on_engine_truncate_table(&mut self) -> Result<Status> {
|
||||
let engine = &self.engine_inner;
|
||||
engine
|
||||
.truncate_table(self.data.request.clone())
|
||||
.await
|
||||
.map_err(Error::from_error_ext)?;
|
||||
Ok(Status::Done)
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents each step while truncating table in the mito engine.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
enum TruncateTableState {
|
||||
/// Prepare to truncate the table.
|
||||
Prepare,
|
||||
/// Engine truncate the table.
|
||||
EngineTruncateTable,
|
||||
}
|
||||
|
||||
/// Serializable data of [TruncateMitoTable].
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct TruncateTableData {
|
||||
state: TruncateTableState,
|
||||
request: TruncateTableRequest,
|
||||
}
|
||||
|
||||
impl TruncateTableData {
|
||||
fn table_ref(&self) -> TableReference {
|
||||
self.request.table_ref()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use table::engine::{EngineContext, TableEngine, TableEngineProcedure};
|
||||
|
||||
use super::*;
|
||||
use crate::engine::procedure::procedure_test_util::{self, TestEnv};
|
||||
use crate::table::test_util;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_procedure_truncate_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let TestEnv {
|
||||
table_engine,
|
||||
dir: _dir,
|
||||
} = procedure_test_util::setup_test_engine("truncate_table").await;
|
||||
let schema = Arc::new(test_util::schema_for_test());
|
||||
let request = test_util::new_create_request(schema.clone());
|
||||
|
||||
let engine_ctx = EngineContext::default();
|
||||
// Create table first.
|
||||
let mut procedure = table_engine
|
||||
.create_table_procedure(&engine_ctx, request.clone())
|
||||
.unwrap();
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
let table_id = request.id;
|
||||
|
||||
let request = test_util::new_truncate_request();
|
||||
|
||||
// Truncate the table.
|
||||
let mut procedure = table_engine
|
||||
.truncate_table_procedure(&engine_ctx, request.clone())
|
||||
.unwrap();
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, table_id)
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
}
|
||||
@@ -222,7 +222,7 @@ async fn test_truncate_reopen() {
|
||||
// Persist the meta action.
|
||||
let prev_version = manifest_version;
|
||||
action_list.set_prev_version(prev_version);
|
||||
assert!(manifest.update(action_list).await.is_ok());
|
||||
manifest.update(action_list).await.unwrap();
|
||||
|
||||
// Reopen and put data.
|
||||
tester.reopen().await;
|
||||
|
||||
@@ -303,4 +303,27 @@ mod tests {
|
||||
let ctx = EngineContext::default();
|
||||
assert!(!table_engine.table_exists(&ctx, table_id,));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_drop_not_exists_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let TestEnv {
|
||||
dir: _,
|
||||
table_engine,
|
||||
procedure_manager: _,
|
||||
catalog_manager,
|
||||
} = TestEnv::new("drop");
|
||||
let table_name = "test_drop";
|
||||
|
||||
let request = DropTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
table_id: 0,
|
||||
};
|
||||
|
||||
let mut procedure =
|
||||
DropTableProcedure::new(request, catalog_manager.clone(), table_engine.clone());
|
||||
assert!(procedure.on_prepare().await.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ mod alter;
|
||||
mod create;
|
||||
mod drop;
|
||||
pub mod error;
|
||||
mod truncate;
|
||||
|
||||
pub use alter::AlterTableProcedure;
|
||||
use catalog::CatalogManagerRef;
|
||||
@@ -25,6 +26,7 @@ use common_procedure::ProcedureManager;
|
||||
pub use create::CreateTableProcedure;
|
||||
pub use drop::DropTableProcedure;
|
||||
use table::engine::{TableEngineProcedureRef, TableEngineRef};
|
||||
pub use truncate::TruncateTableProcedure;
|
||||
|
||||
/// Register all procedure loaders to the procedure manager.
|
||||
///
|
||||
@@ -48,7 +50,12 @@ pub fn register_procedure_loaders(
|
||||
engine_procedure.clone(),
|
||||
procedure_manager,
|
||||
);
|
||||
DropTableProcedure::register_loader(catalog_manager, engine_procedure, procedure_manager);
|
||||
DropTableProcedure::register_loader(
|
||||
catalog_manager.clone(),
|
||||
engine_procedure.clone(),
|
||||
procedure_manager,
|
||||
);
|
||||
TruncateTableProcedure::register_loader(catalog_manager, engine_procedure, procedure_manager)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
298
src/table-procedure/src/truncate.rs
Normal file
298
src/table-procedure/src/truncate.rs
Normal file
@@ -0,0 +1,298 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Procedure to truncate a table.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_procedure::error::SubprocedureFailedSnafu;
|
||||
use common_procedure::{
|
||||
Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState,
|
||||
ProcedureWithId, Result, Status,
|
||||
};
|
||||
use common_telemetry::logging;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use table::engine::{EngineContext, TableEngineProcedureRef, TableReference};
|
||||
use table::requests::TruncateTableRequest;
|
||||
|
||||
use crate::error::{
|
||||
AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableNotFoundSnafu,
|
||||
};
|
||||
|
||||
/// Procedure to truncate a table.
|
||||
#[allow(dead_code)]
|
||||
pub struct TruncateTableProcedure {
|
||||
data: TruncateTableData,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
engine_procedure: TableEngineProcedureRef,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Procedure for TruncateTableProcedure {
|
||||
fn type_name(&self) -> &str {
|
||||
Self::TYPE_NAME
|
||||
}
|
||||
|
||||
async fn execute(&mut self, ctx: &Context) -> Result<Status> {
|
||||
match self.data.state {
|
||||
TruncateTableState::Prepare => self.on_prepare().await,
|
||||
TruncateTableState::EngineTruncateTable => self.on_engine_truncate_table(ctx).await,
|
||||
}
|
||||
}
|
||||
|
||||
fn dump(&self) -> Result<String> {
|
||||
let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?;
|
||||
Ok(json)
|
||||
}
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
// We lock the whole table.
|
||||
let table_name = self.data.table_ref().to_string();
|
||||
LockKey::single(table_name)
|
||||
}
|
||||
}
|
||||
|
||||
impl TruncateTableProcedure {
|
||||
const TYPE_NAME: &str = "table-procedure::TruncateTableProcedure";
|
||||
|
||||
/// Returns a new [TruncateTableProcedure].
|
||||
pub fn new(
|
||||
request: TruncateTableRequest,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
engine_procedure: TableEngineProcedureRef,
|
||||
) -> TruncateTableProcedure {
|
||||
TruncateTableProcedure {
|
||||
data: TruncateTableData {
|
||||
state: TruncateTableState::Prepare,
|
||||
request,
|
||||
subprocedure_id: None,
|
||||
},
|
||||
catalog_manager,
|
||||
engine_procedure,
|
||||
}
|
||||
}
|
||||
|
||||
/// Register the loader of this procedure to the `procedure_manager`.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics on error.
|
||||
pub fn register_loader(
|
||||
catalog_manager: CatalogManagerRef,
|
||||
engine_procedure: TableEngineProcedureRef,
|
||||
procedure_manager: &dyn ProcedureManager,
|
||||
) {
|
||||
procedure_manager
|
||||
.register_loader(
|
||||
Self::TYPE_NAME,
|
||||
Box::new(move |data| {
|
||||
Self::from_json(data, catalog_manager.clone(), engine_procedure.clone())
|
||||
.map(|p| Box::new(p) as _)
|
||||
}),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Recover the procedure from json.
|
||||
fn from_json(
|
||||
json: &str,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
engine_procedure: TableEngineProcedureRef,
|
||||
) -> Result<Self> {
|
||||
let data: TruncateTableData =
|
||||
serde_json::from_str(json).context(DeserializeProcedureSnafu)?;
|
||||
|
||||
Ok(TruncateTableProcedure {
|
||||
data,
|
||||
catalog_manager,
|
||||
engine_procedure,
|
||||
})
|
||||
}
|
||||
|
||||
async fn on_prepare(&mut self) -> Result<Status> {
|
||||
let request = &self.data.request;
|
||||
// Ensure the table exists.
|
||||
let table_exists = self
|
||||
.catalog_manager
|
||||
.table_exist(
|
||||
&request.catalog_name,
|
||||
&request.schema_name,
|
||||
&request.table_name,
|
||||
)
|
||||
.await
|
||||
.context(AccessCatalogSnafu)?;
|
||||
ensure!(
|
||||
table_exists,
|
||||
TableNotFoundSnafu {
|
||||
name: &request.table_name,
|
||||
}
|
||||
);
|
||||
|
||||
self.data.state = TruncateTableState::EngineTruncateTable;
|
||||
// Assign procedure id to the subprocedure.
|
||||
self.data.subprocedure_id = Some(ProcedureId::random());
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
async fn on_engine_truncate_table(&mut self, ctx: &Context) -> Result<Status> {
|
||||
// Safety: subprocedure id is always set in this state.
|
||||
let sub_id = self.data.subprocedure_id.unwrap();
|
||||
|
||||
// Query subprocedure state.
|
||||
let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else {
|
||||
logging::info!(
|
||||
"On engine truncate table {}, subprocedure not found, sub_id: {}",
|
||||
self.data.request.table_name,
|
||||
sub_id,
|
||||
);
|
||||
// If the subprocedure is not found, we create a new subprocedure with the same id.
|
||||
let engine_ctx = EngineContext::default();
|
||||
|
||||
let procedure = self
|
||||
.engine_procedure
|
||||
.truncate_table_procedure(&engine_ctx, self.data.request.clone())
|
||||
.map_err(Error::from_error_ext)?;
|
||||
|
||||
return Ok(Status::Suspended {
|
||||
subprocedures: vec![ProcedureWithId {
|
||||
id: sub_id,
|
||||
procedure,
|
||||
}],
|
||||
persist: true,
|
||||
});
|
||||
};
|
||||
|
||||
match sub_state {
|
||||
ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended {
|
||||
subprocedures: Vec::new(),
|
||||
persist: false,
|
||||
}),
|
||||
ProcedureState::Done => {
|
||||
logging::info!(
|
||||
"On engine truncate table {}, done, sub_id: {}",
|
||||
self.data.request.table_name,
|
||||
sub_id
|
||||
);
|
||||
|
||||
Ok(Status::Done)
|
||||
}
|
||||
ProcedureState::Failed { error } => {
|
||||
// Return error if the subprocedure is failed.
|
||||
Err(error).context(SubprocedureFailedSnafu {
|
||||
subprocedure_id: sub_id,
|
||||
})?
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents each step while truncating a table in the datanode.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
enum TruncateTableState {
|
||||
/// Validate request and prepare to drop table.
|
||||
Prepare,
|
||||
/// Truncate table in the table engine.
|
||||
EngineTruncateTable,
|
||||
}
|
||||
|
||||
/// Serializable data of [TruncateTableProcedure].
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct TruncateTableData {
|
||||
/// Current state.
|
||||
state: TruncateTableState,
|
||||
/// Request to truncate this table.
|
||||
request: TruncateTableRequest,
|
||||
/// Id of the subprocedure to truncate this table from the engine.
|
||||
///
|
||||
/// This id is `Some` while the procedure is in [TruncateTableState::EngineTruncateTable]
|
||||
/// state.
|
||||
subprocedure_id: Option<ProcedureId>,
|
||||
}
|
||||
|
||||
impl TruncateTableData {
|
||||
fn table_ref(&self) -> TableReference {
|
||||
TableReference {
|
||||
catalog: &self.request.catalog_name,
|
||||
schema: &self.request.schema_name,
|
||||
table: &self.request.table_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use table::engine::TableEngine;
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::TestEnv;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_truncate_table_procedure() {
|
||||
let env = TestEnv::new("truncate");
|
||||
let table_name = "test_truncate";
|
||||
let table_id = env.create_table(table_name).await;
|
||||
|
||||
let request = TruncateTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
table_id,
|
||||
};
|
||||
let TestEnv {
|
||||
dir: _dir,
|
||||
table_engine,
|
||||
procedure_manager,
|
||||
catalog_manager,
|
||||
} = env;
|
||||
let procedure =
|
||||
TruncateTableProcedure::new(request, catalog_manager.clone(), table_engine.clone());
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap();
|
||||
watcher.changed().await.unwrap();
|
||||
|
||||
assert!(catalog_manager
|
||||
.table_exist(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
|
||||
.await
|
||||
.unwrap());
|
||||
|
||||
let ctx = EngineContext::default();
|
||||
assert!(table_engine.table_exists(&ctx, table_id));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_truncate_not_exists_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let TestEnv {
|
||||
dir: _,
|
||||
table_engine,
|
||||
procedure_manager: _,
|
||||
catalog_manager,
|
||||
} = TestEnv::new("truncate");
|
||||
let table_name = "test_truncate";
|
||||
|
||||
let request = TruncateTableRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
table_id: 0,
|
||||
};
|
||||
|
||||
let mut procedure =
|
||||
TruncateTableProcedure::new(request, catalog_manager.clone(), table_engine.clone());
|
||||
assert!(procedure.on_prepare().await.is_err());
|
||||
}
|
||||
}
|
||||
@@ -169,6 +169,13 @@ pub trait TableEngineProcedure: Send + Sync {
|
||||
ctx: &EngineContext,
|
||||
request: DropTableRequest,
|
||||
) -> Result<BoxedProcedure>;
|
||||
|
||||
/// Returns a procedure that truncates a table by specific `request`.
|
||||
fn truncate_table_procedure(
|
||||
&self,
|
||||
ctx: &EngineContext,
|
||||
request: TruncateTableRequest,
|
||||
) -> Result<BoxedProcedure>;
|
||||
}
|
||||
|
||||
pub type TableEngineProcedureRef = Arc<dyn TableEngineProcedure>;
|
||||
|
||||
@@ -111,7 +111,7 @@ impl TableEngine for MockTableEngine {
|
||||
_ctx: &EngineContext,
|
||||
_request: TruncateTableRequest,
|
||||
) -> Result<bool> {
|
||||
Ok(true)
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,4 +139,12 @@ impl TableEngineProcedure for MockTableEngine {
|
||||
) -> Result<BoxedProcedure> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn truncate_table_procedure(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
_request: TruncateTableRequest,
|
||||
) -> Result<BoxedProcedure> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user