refactor: remove table procedure (#2359)

remove table procedure

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-09-12 06:24:51 -05:00
parent 1ad5f6e5d5
commit eeecce4623
11 changed files with 0 additions and 1542 deletions

25
Cargo.lock generated
View File

@@ -2676,7 +2676,6 @@ dependencies = [
"store-api",
"substrait 0.4.0-nightly",
"table",
"table-procedure",
"tokio",
"tokio-stream",
"toml 0.7.6",
@@ -9515,30 +9514,6 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "table-procedure"
version = "0.4.0-nightly"
dependencies = [
"async-trait",
"catalog",
"common-catalog",
"common-error",
"common-procedure",
"common-procedure-test",
"common-telemetry",
"common-test-util",
"datatypes",
"log-store",
"mito",
"object-store",
"serde",
"serde_json",
"snafu",
"storage",
"table",
"tokio",
]
[[package]]
name = "tagptr"
version = "0.2.0"

View File

@@ -48,7 +48,6 @@ members = [
"src/storage",
"src/store-api",
"src/table",
"src/table-procedure",
# TODO: add this back once the region server is available
# "tests-integration",
"tests/runner",

View File

@@ -66,7 +66,6 @@ storage = { workspace = true }
store-api = { workspace = true }
substrait = { workspace = true }
table = { workspace = true }
table-procedure = { workspace = true }
tokio-stream = { version = "0.1", features = ["net"] }
tokio.workspace = true
toml.workspace = true

View File

@@ -1,27 +0,0 @@
[package]
name = "table-procedure"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
async-trait.workspace = true
catalog = { workspace = true }
common-error = { workspace = true }
common-procedure = { workspace = true }
common-telemetry = { workspace = true }
datatypes = { workspace = true }
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
table = { workspace = true }
[dev-dependencies]
common-catalog = { workspace = true }
common-procedure-test = { workspace = true }
common-test-util = { workspace = true }
log-store = { workspace = true }
mito = { workspace = true }
object-store = { workspace = true }
storage = { workspace = true }
tokio.workspace = true

View File

@@ -1,266 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Procedure to alter a table.
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_procedure::error::SubprocedureFailedSnafu;
use common_procedure::{
Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState,
ProcedureWithId, Result, Status,
};
use common_telemetry::logging;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineProcedureRef, TableReference};
use table::metadata::TableId;
use table::requests::{AlterKind, AlterTableRequest};
use crate::error::{
AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableExistsSnafu,
TableNotFoundSnafu,
};
/// Procedure to alter a table.
pub struct AlterTableProcedure {
data: AlterTableData,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
}
#[async_trait]
impl Procedure for AlterTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, ctx: &Context) -> Result<Status> {
match self.data.state {
AlterTableState::Prepare => self.on_prepare().await,
AlterTableState::EngineAlterTable => self.on_engine_alter_table(ctx).await,
// No more need to "rename table in catalog", because the table metadata is now stored
// in kv backend, and updated by the unified DDL procedure soon. For ordinary tables,
// catalog manager will be a readonly proxy.
}
}
fn dump(&self) -> Result<String> {
let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?;
Ok(json)
}
fn lock_key(&self) -> LockKey {
// We lock the whole table.
let table_name = self.data.table_ref().to_string();
// If alter kind is rename, we also need to lock the renamed table.
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
let new_table_name = TableReference {
catalog: &self.data.request.catalog_name,
schema: &self.data.request.schema_name,
table: new_table_name,
}
.to_string();
LockKey::new([table_name, new_table_name])
} else {
LockKey::single(table_name)
}
}
}
impl AlterTableProcedure {
const TYPE_NAME: &str = "table-procedure:AlterTableProcedure";
/// Returns a new [AlterTableProcedure].
pub fn new(
request: AlterTableRequest,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
) -> AlterTableProcedure {
AlterTableProcedure {
data: AlterTableData {
state: AlterTableState::Prepare,
request,
table_id: None,
subprocedure_id: None,
},
catalog_manager,
engine_procedure,
}
}
/// Register the loader of this procedure to the `procedure_manager`.
///
/// # Panics
/// Panics on error.
pub fn register_loader(
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
procedure_manager: &dyn ProcedureManager,
) {
procedure_manager
.register_loader(
Self::TYPE_NAME,
Box::new(move |data| {
Self::from_json(data, catalog_manager.clone(), engine_procedure.clone())
.map(|p| Box::new(p) as _)
}),
)
.unwrap()
}
/// Recover the procedure from json.
fn from_json(
json: &str,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
) -> Result<Self> {
let data: AlterTableData = serde_json::from_str(json).context(DeserializeProcedureSnafu)?;
Ok(AlterTableProcedure {
data,
catalog_manager,
engine_procedure,
})
}
async fn on_prepare(&mut self) -> Result<Status> {
let request = &self.data.request;
let table = self
.catalog_manager
.table(
&request.catalog_name,
&request.schema_name,
&request.table_name,
)
.await
.context(AccessCatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
name: format!(
"{}.{}.{}",
request.catalog_name, request.schema_name, request.table_name
),
})?;
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
ensure!(
self.catalog_manager
.table(&request.catalog_name, &request.schema_name, new_table_name)
.await
.context(AccessCatalogSnafu)?
.is_none(),
TableExistsSnafu {
name: format!(
"{}.{}.{}",
request.catalog_name, request.schema_name, new_table_name
),
}
);
}
self.data.state = AlterTableState::EngineAlterTable;
// Assign procedure id to the subprocedure.
self.data.subprocedure_id = Some(ProcedureId::random());
// Set the table id.
self.data.table_id = Some(table.table_info().ident.table_id);
Ok(Status::executing(true))
}
async fn on_engine_alter_table(&mut self, ctx: &Context) -> Result<Status> {
// Safety: subprocedure id is always set in this state.
let sub_id = self.data.subprocedure_id.unwrap();
// Query subprocedure state.
let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else {
logging::info!(
"On engine alter table {}, subprocedure not found, sub_id: {}",
self.data.request.table_name,
sub_id
);
// If the subprocedure is not found, we create a new subprocedure with the same id.
let engine_ctx = EngineContext::default();
let procedure = self
.engine_procedure
.alter_table_procedure(&engine_ctx, self.data.request.clone())
.map_err(Error::from_error_ext)?;
return Ok(Status::Suspended {
subprocedures: vec![ProcedureWithId {
id: sub_id,
procedure,
}],
persist: true,
});
};
match sub_state {
ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended {
subprocedures: Vec::new(),
persist: false,
}),
ProcedureState::Done => {
logging::info!(
"On engine alter table {}, done, sub_id: {}",
self.data.request.table_name,
sub_id
);
Ok(Status::Done)
}
ProcedureState::Failed { error } => {
// Return error if the subprocedure is failed.
Err(error).context(SubprocedureFailedSnafu {
subprocedure_id: sub_id,
})?
}
}
}
}
/// Represents each step while altering a table in the datanode.
#[derive(Debug, Serialize, Deserialize)]
enum AlterTableState {
/// Validate request and prepare to alter table.
Prepare,
/// Alter table in the table engine.
EngineAlterTable,
}
/// Serializable data of [AlterTableProcedure].
#[derive(Debug, Serialize, Deserialize)]
struct AlterTableData {
/// Current state.
state: AlterTableState,
/// Request to alter this table.
request: AlterTableRequest,
/// Id of the table.
///
/// Available after [AlterTableState::Prepare] state.
table_id: Option<TableId>,
/// Id of the subprocedure to alter this table in the engine.
///
/// This id is `Some` while the procedure is in [AlterTableState::EngineAlterTable]
/// state.
subprocedure_id: Option<ProcedureId>,
}
impl AlterTableData {
fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.request.catalog_name,
schema: &self.request.schema_name,
table: &self.request.table_name,
}
}
}

View File

@@ -1,416 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Procedure to create a table.
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_procedure::error::SubprocedureFailedSnafu;
use common_procedure::{
Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState,
ProcedureWithId, Result, Status,
};
use common_telemetry::logging;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineProcedureRef, TableEngineRef, TableReference};
use table::requests::{CreateTableRequest, OpenTableRequest};
use crate::error::{
AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableExistsSnafu,
};
/// Procedure to create a table.
pub struct CreateTableProcedure {
data: CreateTableData,
catalog_manager: CatalogManagerRef,
table_engine: TableEngineRef,
engine_procedure: TableEngineProcedureRef,
}
#[async_trait]
impl Procedure for CreateTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, ctx: &Context) -> Result<Status> {
match self.data.state {
CreateTableState::Prepare => self.on_prepare().await,
CreateTableState::EngineCreateTable => self.on_engine_create_table(ctx).await,
CreateTableState::RegisterCatalog => self.on_register_catalog().await,
}
}
fn dump(&self) -> Result<String> {
let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?;
Ok(json)
}
fn lock_key(&self) -> LockKey {
// We lock the whole table.
let table_name = self.data.table_ref().to_string();
LockKey::single(table_name)
}
}
impl CreateTableProcedure {
const TYPE_NAME: &str = "table-procedure::CreateTableProcedure";
/// Returns a new [CreateTableProcedure].
pub fn new(
request: CreateTableRequest,
catalog_manager: CatalogManagerRef,
table_engine: TableEngineRef,
engine_procedure: TableEngineProcedureRef,
) -> CreateTableProcedure {
CreateTableProcedure {
data: CreateTableData {
state: CreateTableState::Prepare,
request,
subprocedure_id: None,
},
catalog_manager,
table_engine,
engine_procedure,
}
}
/// Register the loader of this procedure to the `procedure_manager`.
///
/// # Panics
/// Panics on error.
pub fn register_loader(
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
table_engine: TableEngineRef,
procedure_manager: &dyn ProcedureManager,
) {
procedure_manager
.register_loader(
Self::TYPE_NAME,
Box::new(move |data| {
Self::from_json(
data,
catalog_manager.clone(),
table_engine.clone(),
engine_procedure.clone(),
)
.map(|p| Box::new(p) as _)
}),
)
.unwrap()
}
/// Recover the procedure from json.
fn from_json(
json: &str,
catalog_manager: CatalogManagerRef,
table_engine: TableEngineRef,
engine_procedure: TableEngineProcedureRef,
) -> Result<Self> {
let data: CreateTableData =
serde_json::from_str(json).context(DeserializeProcedureSnafu)?;
Ok(CreateTableProcedure {
data,
catalog_manager,
table_engine,
engine_procedure,
})
}
async fn on_prepare(&mut self) -> Result<Status> {
let table_exists = self
.catalog_manager
.table_exist(
&self.data.request.catalog_name,
&self.data.request.schema_name,
&self.data.request.table_name,
)
.await
.context(AccessCatalogSnafu)?;
if table_exists {
return if self.data.request.create_if_not_exists {
Ok(Status::Done)
} else {
TableExistsSnafu {
name: &self.data.request.table_name,
}
.fail()?
};
}
self.data.state = CreateTableState::EngineCreateTable;
// Assign procedure id to the subprocedure.
self.data.subprocedure_id = Some(ProcedureId::random());
Ok(Status::executing(true))
}
async fn on_engine_create_table(&mut self, ctx: &Context) -> Result<Status> {
// Safety: subprocedure id is always set in this state.
let sub_id = self.data.subprocedure_id.unwrap();
// Query subprocedure state.
let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else {
// We need to submit the subprocedure if it doesn't exist. We always need to
// do this check as we might not submitted the subprocedure yet when the manager
// recover this procedure from procedure store.
logging::info!(
"On engine create table {}, table_id: {}, subprocedure not found, sub_id: {}",
self.data.request.table_name,
self.data.request.id,
sub_id
);
// If the sub procedure is not found, we create a new sub procedure with the same id.
let engine_ctx = EngineContext::default();
let procedure = self
.engine_procedure
.create_table_procedure(&engine_ctx, self.data.request.clone())
.map_err(Error::from_error_ext)?;
return Ok(Status::Suspended {
subprocedures: vec![ProcedureWithId {
id: sub_id,
procedure,
}],
persist: true,
});
};
match sub_state {
ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended {
subprocedures: Vec::new(),
persist: false,
}),
ProcedureState::Done => {
logging::info!(
"On engine create table {}, table_id: {}, done, sub_id: {}",
self.data.request.table_name,
self.data.request.id,
sub_id
);
// The sub procedure is done, we can execute next step.
self.data.state = CreateTableState::RegisterCatalog;
Ok(Status::executing(true))
}
ProcedureState::Failed { error } => {
// Return error if the subprocedure is failed.
Err(error).context(SubprocedureFailedSnafu {
subprocedure_id: sub_id,
})?
}
}
}
async fn on_register_catalog(&mut self) -> Result<Status> {
if self
.catalog_manager
.table_exist(
&self.data.request.catalog_name,
&self.data.request.schema_name,
&self.data.request.table_name,
)
.await
.map_err(Error::from_error_ext)?
{
return Ok(Status::Done);
}
// If we recover the procedure from json, then the table engine hasn't open this table yet,
// so we need to use `open_table()` instead of `get_table()`.
let engine_ctx = EngineContext::default();
let open_req = OpenTableRequest {
catalog_name: self.data.request.catalog_name.clone(),
schema_name: self.data.request.schema_name.clone(),
table_name: self.data.request.table_name.clone(),
table_id: self.data.request.id,
region_numbers: self.data.request.region_numbers.clone(),
};
// Safety: The table is already created.
let _ = self
.table_engine
.open_table(&engine_ctx, open_req)
.await
.map_err(Error::from_error_ext)?
.unwrap();
Ok(Status::Done)
}
}
/// Represents each step while creating a table in the datanode.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
enum CreateTableState {
/// Validate request and prepare to create table.
Prepare,
/// Create table in the table engine.
EngineCreateTable,
/// Register the table to the catalog.
RegisterCatalog,
}
/// Serializable data of [CreateTableProcedure].
#[derive(Debug, Serialize, Deserialize)]
struct CreateTableData {
/// Current state.
state: CreateTableState,
/// Request to create this table.
request: CreateTableRequest,
/// Id of the subprocedure to create this table in the engine.
///
/// This id is `Some` while the procedure is in [CreateTableState::EngineCreateTable]
/// state.
subprocedure_id: Option<ProcedureId>,
}
impl CreateTableData {
fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.request.catalog_name,
schema: &self.request.schema_name,
table: &self.request.table_name,
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use common_procedure_test::{
execute_procedure_once, execute_procedure_until_done, execute_until_suspended_or_done,
MockContextProvider,
};
use table::engine::{EngineContext, TableEngine};
use super::*;
use crate::test_util::{self, TestEnv};
#[tokio::test]
async fn test_create_table_procedure() {
let TestEnv {
dir: _dir,
table_engine,
procedure_manager,
catalog_manager,
} = TestEnv::new("create");
let table_name = "test_create";
let request = test_util::new_create_request(table_name);
let procedure = CreateTableProcedure::new(
request.clone(),
catalog_manager,
table_engine.clone(),
table_engine.clone(),
);
let engine_ctx = EngineContext::default();
assert!(table_engine
.get_table(&engine_ctx, request.id)
.unwrap()
.is_none());
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap();
watcher.changed().await.unwrap();
assert!(table_engine
.get_table(&engine_ctx, request.id)
.unwrap()
.is_some());
}
#[tokio::test]
async fn test_recover_register_catalog() {
common_telemetry::init_default_ut_logging();
let TestEnv {
dir,
table_engine,
procedure_manager: _,
catalog_manager,
} = TestEnv::new("create");
let table_name = "test_create";
let request = test_util::new_create_request(table_name);
let procedure = CreateTableProcedure::new(
request.clone(),
catalog_manager,
table_engine.clone(),
table_engine.clone(),
);
let engine_ctx = EngineContext::default();
assert!(table_engine
.get_table(&engine_ctx, request.id)
.unwrap()
.is_none());
let procedure_id = ProcedureId::random();
let mut procedure = Box::new(procedure);
// Execute until suspended. We use an empty provider so the parent can submit
// a new subprocedure as the it can't find the subprocedure.
let mut subprocedures = execute_until_suspended_or_done(
procedure_id,
MockContextProvider::default(),
&mut procedure,
)
.await
.unwrap();
assert_eq!(1, subprocedures.len());
// Execute the subprocedure.
let mut subprocedure = subprocedures.pop().unwrap();
execute_procedure_until_done(&mut subprocedure.procedure).await;
let states = HashMap::from([(subprocedure.id, ProcedureState::Done)]);
// Execute the parent procedure once.
let _ = execute_procedure_once(
procedure_id,
MockContextProvider::new(states),
&mut procedure,
)
.await;
assert_eq!(CreateTableState::RegisterCatalog, procedure.data.state);
// Close the table engine and reopen the TestEnv.
table_engine.close().await.unwrap();
let TestEnv {
dir: _dir,
table_engine,
procedure_manager: _,
catalog_manager,
} = TestEnv::from_temp_dir(dir);
// Recover the procedure
let json = procedure.dump().unwrap();
let procedure = CreateTableProcedure::from_json(
&json,
catalog_manager,
table_engine.clone(),
table_engine.clone(),
)
.unwrap();
let mut procedure = Box::new(procedure);
assert_eq!(CreateTableState::RegisterCatalog, procedure.data.state);
// Execute until done.
execute_procedure_until_done(&mut procedure).await;
// The table is created.
assert!(table_engine
.get_table(&engine_ctx, request.id)
.unwrap()
.is_some());
}
}

View File

@@ -1,270 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Procedure to drop a table.
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_procedure::error::SubprocedureFailedSnafu;
use common_procedure::{
Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState,
ProcedureWithId, Result, Status,
};
use common_telemetry::logging;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use table::engine::{EngineContext, TableEngineProcedureRef, TableReference};
use table::requests::DropTableRequest;
use crate::error::{
AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableNotFoundSnafu,
};
/// Procedure to drop a table.
pub struct DropTableProcedure {
data: DropTableData,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
}
#[async_trait]
impl Procedure for DropTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, ctx: &Context) -> Result<Status> {
match self.data.state {
DropTableState::Prepare => self.on_prepare().await,
DropTableState::RemoveFromCatalog => self.on_remove_from_catalog().await,
DropTableState::EngineDropTable => self.on_engine_drop_table(ctx).await,
}
}
fn dump(&self) -> Result<String> {
let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?;
Ok(json)
}
fn lock_key(&self) -> LockKey {
// We lock the whole table.
let table_name = self.data.table_ref().to_string();
LockKey::single(table_name)
}
}
impl DropTableProcedure {
const TYPE_NAME: &str = "table-procedure::DropTableProcedure";
/// Returns a new [DropTableProcedure].
pub fn new(
request: DropTableRequest,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
) -> DropTableProcedure {
DropTableProcedure {
data: DropTableData {
state: DropTableState::Prepare,
request,
subprocedure_id: None,
},
catalog_manager,
engine_procedure,
}
}
/// Register the loader of this procedure to the `procedure_manager`.
///
/// # Panics
/// Panics on error.
pub fn register_loader(
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
procedure_manager: &dyn ProcedureManager,
) {
procedure_manager
.register_loader(
Self::TYPE_NAME,
Box::new(move |data| {
Self::from_json(data, catalog_manager.clone(), engine_procedure.clone())
.map(|p| Box::new(p) as _)
}),
)
.unwrap()
}
/// Recover the procedure from json.
fn from_json(
json: &str,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
) -> Result<Self> {
let data: DropTableData = serde_json::from_str(json).context(DeserializeProcedureSnafu)?;
Ok(DropTableProcedure {
data,
catalog_manager,
engine_procedure,
})
}
async fn on_prepare(&mut self) -> Result<Status> {
let request = &self.data.request;
// Ensure the table exists.
let table_exists = self
.catalog_manager
.table_exist(
&request.catalog_name,
&request.schema_name,
&request.table_name,
)
.await
.context(AccessCatalogSnafu)?;
ensure!(
table_exists,
TableNotFoundSnafu {
name: &request.table_name,
}
);
self.data.state = DropTableState::RemoveFromCatalog;
Ok(Status::executing(true))
}
async fn on_remove_from_catalog(&mut self) -> Result<Status> {
self.data.state = DropTableState::EngineDropTable;
// Assign procedure id to the subprocedure.
self.data.subprocedure_id = Some(ProcedureId::random());
Ok(Status::executing(true))
}
async fn on_engine_drop_table(&mut self, ctx: &Context) -> Result<Status> {
// Safety: subprocedure id is always set in this state.
let sub_id = self.data.subprocedure_id.unwrap();
// Query subprocedure state.
let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else {
logging::info!(
"On engine drop table {}, subprocedure not found, sub_id: {}",
self.data.request.table_name,
sub_id
);
// If the subprocedure is not found, we create a new subprocedure with the same id.
let engine_ctx = EngineContext::default();
let procedure = self
.engine_procedure
.drop_table_procedure(&engine_ctx, self.data.request.clone())
.map_err(Error::from_error_ext)?;
return Ok(Status::Suspended {
subprocedures: vec![ProcedureWithId {
id: sub_id,
procedure,
}],
persist: true,
});
};
match sub_state {
ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended {
subprocedures: Vec::new(),
persist: false,
}),
ProcedureState::Done => {
logging::info!(
"On engine drop table {}, done, sub_id: {}",
self.data.request.table_name,
sub_id
);
Ok(Status::Done)
}
ProcedureState::Failed { error } => {
// Return error if the subprocedure is failed.
Err(error).context(SubprocedureFailedSnafu {
subprocedure_id: sub_id,
})?
}
}
}
}
/// Represents each step while dropping a table in the datanode.
#[derive(Debug, Serialize, Deserialize)]
enum DropTableState {
/// Validate request and prepare to drop table.
Prepare,
/// Remove the table from the catalog.
RemoveFromCatalog,
/// Drop table in the table engine.
EngineDropTable,
}
/// Serializable data of [DropTableProcedure].
#[derive(Debug, Serialize, Deserialize)]
struct DropTableData {
/// Current state.
state: DropTableState,
/// Request to drop this table.
request: DropTableRequest,
/// Id of the subprocedure to drop this table from the engine.
///
/// This id is `Some` while the procedure is in [DropTableState::EngineDropTable]
/// state.
subprocedure_id: Option<ProcedureId>,
}
impl DropTableData {
fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.request.catalog_name,
schema: &self.request.schema_name,
table: &self.request.table_name,
}
}
}
#[cfg(test)]
mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use super::*;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_drop_not_exists_table() {
common_telemetry::init_default_ut_logging();
let TestEnv {
dir: _,
table_engine,
procedure_manager: _,
catalog_manager,
} = TestEnv::new("drop");
let table_name = "test_drop";
let request = DropTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id: 0,
};
let mut procedure =
DropTableProcedure::new(request, catalog_manager.clone(), table_engine.clone());
assert!(procedure.on_prepare().await.is_err());
}
}

View File

@@ -1,84 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use snafu::{Location, Snafu};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum Error {
#[snafu(display("Failed to serialize procedure to json, source: {}", source))]
SerializeProcedure {
source: serde_json::Error,
location: Location,
},
#[snafu(display("Failed to deserialize procedure from json, source: {}", source))]
DeserializeProcedure {
source: serde_json::Error,
location: Location,
},
#[snafu(display("Invalid raw schema, source: {}", source))]
InvalidRawSchema {
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Failed to access catalog, source: {}", source))]
AccessCatalog {
location: Location,
source: catalog::error::Error,
},
#[snafu(display("Table {} not found", name))]
TableNotFound { name: String },
#[snafu(display("Table already exists: {}", name))]
TableExists { name: String },
#[snafu(display("Failed to deregister table: {}", name))]
DeregisterTable { name: String },
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
DeregisterTable { .. } | SerializeProcedure { .. } | DeserializeProcedure { .. } => {
StatusCode::Internal
}
InvalidRawSchema { source, .. } => source.status_code(),
AccessCatalog { source, .. } => source.status_code(),
TableNotFound { .. } => StatusCode::TableNotFound,
TableExists { .. } => StatusCode::TableAlreadyExists,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl From<Error> for common_procedure::Error {
fn from(e: Error) -> common_procedure::Error {
common_procedure::Error::from_error_ext(e)
}
}

View File

@@ -1,62 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Procedures for table operations.
mod alter;
mod create;
mod drop;
pub mod error;
mod truncate;
pub use alter::AlterTableProcedure;
use catalog::CatalogManagerRef;
use common_procedure::ProcedureManager;
pub use create::CreateTableProcedure;
pub use drop::DropTableProcedure;
use table::engine::{TableEngineProcedureRef, TableEngineRef};
pub use truncate::TruncateTableProcedure;
/// Register all procedure loaders to the procedure manager.
///
/// # Panics
/// Panics on error.
#[allow(clippy::items_after_test_module)]
pub fn register_procedure_loaders(
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
table_engine: TableEngineRef,
procedure_manager: &dyn ProcedureManager,
) {
CreateTableProcedure::register_loader(
catalog_manager.clone(),
engine_procedure.clone(),
table_engine,
procedure_manager,
);
AlterTableProcedure::register_loader(
catalog_manager.clone(),
engine_procedure.clone(),
procedure_manager,
);
DropTableProcedure::register_loader(
catalog_manager.clone(),
engine_procedure.clone(),
procedure_manager,
);
TruncateTableProcedure::register_loader(catalog_manager, engine_procedure, procedure_manager)
}
#[cfg(test)]
mod test_util;

View File

@@ -1,127 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use catalog::local::MemoryCatalogManager;
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::store::state_store::ObjectStateStore;
use common_procedure::ProcedureManagerRef;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use log_store::NoopLogStore;
use mito::config::EngineConfig;
use mito::engine::{MitoEngine, MITO_ENGINE};
use object_store::services::Fs;
use object_store::ObjectStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::requests::CreateTableRequest;
pub struct TestEnv {
pub dir: TempDir,
pub table_engine: Arc<MitoEngine<EngineImpl<NoopLogStore>>>,
pub procedure_manager: ProcedureManagerRef,
pub catalog_manager: CatalogManagerRef,
}
impl TestEnv {
pub fn new(prefix: &str) -> TestEnv {
let dir = create_temp_dir(prefix);
TestEnv::from_temp_dir(dir)
}
pub fn from_temp_dir(dir: TempDir) -> TestEnv {
let store_dir = format!("{}/db", dir.path().to_string_lossy());
let mut builder = Fs::default();
let _ = builder.root(&store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let storage_engine = EngineImpl::new(
StorageEngineConfig::default(),
Arc::new(NoopLogStore),
object_store.clone(),
compaction_scheduler,
)
.unwrap();
let table_engine = Arc::new(MitoEngine::new(
EngineConfig::default(),
storage_engine,
object_store,
));
let procedure_dir = format!("{}/procedure", dir.path().to_string_lossy());
let mut builder = Fs::default();
let _ = builder.root(&procedure_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let config = ManagerConfig {
max_retry_times: 3,
retry_delay: Duration::from_secs(500),
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(object_store));
let procedure_manager = Arc::new(LocalManager::new(config, state_store));
let catalog_manager = MemoryCatalogManager::with_default_setup();
TestEnv {
dir,
table_engine,
procedure_manager,
catalog_manager,
}
}
}
pub fn schema_for_test() -> RawSchema {
let column_schemas = vec![
// Key
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
// Nullable value column: cpu
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
// Non-null value column: memory
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), false),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true),
];
RawSchema::new(column_schemas)
}
pub fn new_create_request(table_name: &str) -> CreateTableRequest {
CreateTableRequest {
id: 1,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
desc: Some("a test table".to_string()),
schema: schema_for_test(),
region_numbers: vec![0, 1],
create_if_not_exists: true,
primary_key_indices: vec![0],
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
}
}

View File

@@ -1,263 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Procedure to truncate a table.
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_procedure::error::SubprocedureFailedSnafu;
use common_procedure::{
Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState,
ProcedureWithId, Result, Status,
};
use common_telemetry::logging;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use table::engine::{EngineContext, TableEngineProcedureRef, TableReference};
use table::requests::TruncateTableRequest;
use crate::error::{
AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableNotFoundSnafu,
};
/// Procedure to truncate a table.
#[allow(dead_code)]
pub struct TruncateTableProcedure {
data: TruncateTableData,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
}
#[async_trait]
impl Procedure for TruncateTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, ctx: &Context) -> Result<Status> {
match self.data.state {
TruncateTableState::Prepare => self.on_prepare().await,
TruncateTableState::EngineTruncateTable => self.on_engine_truncate_table(ctx).await,
}
}
fn dump(&self) -> Result<String> {
let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?;
Ok(json)
}
fn lock_key(&self) -> LockKey {
// We lock the whole table.
let table_name = self.data.table_ref().to_string();
LockKey::single(table_name)
}
}
impl TruncateTableProcedure {
const TYPE_NAME: &str = "table-procedure::TruncateTableProcedure";
/// Returns a new [TruncateTableProcedure].
pub fn new(
request: TruncateTableRequest,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
) -> TruncateTableProcedure {
TruncateTableProcedure {
data: TruncateTableData {
state: TruncateTableState::Prepare,
request,
subprocedure_id: None,
},
catalog_manager,
engine_procedure,
}
}
/// Register the loader of this procedure to the `procedure_manager`.
///
/// # Panics
/// Panics on error.
pub fn register_loader(
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
procedure_manager: &dyn ProcedureManager,
) {
procedure_manager
.register_loader(
Self::TYPE_NAME,
Box::new(move |data| {
Self::from_json(data, catalog_manager.clone(), engine_procedure.clone())
.map(|p| Box::new(p) as _)
}),
)
.unwrap()
}
/// Recover the procedure from json.
fn from_json(
json: &str,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
) -> Result<Self> {
let data: TruncateTableData =
serde_json::from_str(json).context(DeserializeProcedureSnafu)?;
Ok(TruncateTableProcedure {
data,
catalog_manager,
engine_procedure,
})
}
async fn on_prepare(&mut self) -> Result<Status> {
let request = &self.data.request;
// Ensure the table exists.
let table_exists = self
.catalog_manager
.table_exist(
&request.catalog_name,
&request.schema_name,
&request.table_name,
)
.await
.context(AccessCatalogSnafu)?;
ensure!(
table_exists,
TableNotFoundSnafu {
name: &request.table_name,
}
);
self.data.state = TruncateTableState::EngineTruncateTable;
// Assign procedure id to the subprocedure.
self.data.subprocedure_id = Some(ProcedureId::random());
Ok(Status::executing(true))
}
async fn on_engine_truncate_table(&mut self, ctx: &Context) -> Result<Status> {
// Safety: subprocedure id is always set in this state.
let sub_id = self.data.subprocedure_id.unwrap();
// Query subprocedure state.
let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else {
logging::info!(
"On engine truncate table {}, subprocedure not found, sub_id: {}",
self.data.request.table_name,
sub_id,
);
// If the subprocedure is not found, we create a new subprocedure with the same id.
let engine_ctx = EngineContext::default();
let procedure = self
.engine_procedure
.truncate_table_procedure(&engine_ctx, self.data.request.clone())
.map_err(Error::from_error_ext)?;
return Ok(Status::Suspended {
subprocedures: vec![ProcedureWithId {
id: sub_id,
procedure,
}],
persist: true,
});
};
match sub_state {
ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended {
subprocedures: Vec::new(),
persist: false,
}),
ProcedureState::Done => {
logging::info!(
"On engine truncate table {}, done, sub_id: {}",
self.data.request.table_name,
sub_id
);
Ok(Status::Done)
}
ProcedureState::Failed { error } => {
// Return error if the subprocedure is failed.
Err(error).context(SubprocedureFailedSnafu {
subprocedure_id: sub_id,
})?
}
}
}
}
/// Represents each step while truncating a table in the datanode.
#[derive(Debug, Serialize, Deserialize)]
enum TruncateTableState {
/// Validate request and prepare to drop table.
Prepare,
/// Truncate table in the table engine.
EngineTruncateTable,
}
/// Serializable data of [TruncateTableProcedure].
#[derive(Debug, Serialize, Deserialize)]
struct TruncateTableData {
/// Current state.
state: TruncateTableState,
/// Request to truncate this table.
request: TruncateTableRequest,
/// Id of the subprocedure to truncate this table from the engine.
///
/// This id is `Some` while the procedure is in [TruncateTableState::EngineTruncateTable]
/// state.
subprocedure_id: Option<ProcedureId>,
}
impl TruncateTableData {
fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.request.catalog_name,
schema: &self.request.schema_name,
table: &self.request.table_name,
}
}
}
#[cfg(test)]
mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use super::*;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_truncate_not_exists_table() {
common_telemetry::init_default_ut_logging();
let TestEnv {
dir: _,
table_engine,
procedure_manager: _,
catalog_manager,
} = TestEnv::new("truncate");
let table_name = "test_truncate";
let request = TruncateTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id: 0,
};
let mut procedure =
TruncateTableProcedure::new(request, catalog_manager.clone(), table_engine.clone());
assert!(procedure.on_prepare().await.is_err());
}
}