feat: Add alter table procedure (#1354)

* feat: Implement AlterTableProcedure

* test: Test alter table procedure

* feat: support alter table by procedure in datanode

* chore: update comment
This commit is contained in:
Yingwen
2023-04-13 14:05:53 +08:00
committed by GitHub
parent 9fa871a3fa
commit 0819582a26
9 changed files with 499 additions and 52 deletions

1
Cargo.lock generated
View File

@@ -7876,6 +7876,7 @@ version = "0.1.1"
dependencies = [
"async-trait",
"catalog",
"common-catalog",
"common-error",
"common-procedure",
"common-telemetry",

View File

@@ -54,6 +54,8 @@ pub enum SqlRequest {
pub struct SqlHandler {
table_engine_manager: TableEngineManagerRef,
catalog_manager: CatalogManagerRef,
// TODO(yingwen): Support multiple table engine. We need to add a method
// to TableEngineManagerRef to return engine procedure by engine name.
engine_procedure: TableEngineProcedureRef,
procedure_manager: Option<ProcedureManagerRef>,
}

View File

@@ -13,18 +13,25 @@
// limitations under the License.
use catalog::RenameTableRequest;
use common_procedure::{watcher, ProcedureManagerRef, ProcedureWithId};
use common_query::Output;
use common_telemetry::logging::info;
use snafu::prelude::*;
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::column_def_to_schema;
use table::engine::{EngineContext, TableReference};
use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest};
use table_procedure::AlterTableProcedure;
use crate::error::{self, Result};
use crate::sql::SqlHandler;
impl SqlHandler {
pub(crate) async fn alter(&self, req: AlterTableRequest) -> Result<Output> {
if let Some(procedure_manager) = &self.procedure_manager {
return self.alter_table_by_procedure(procedure_manager, req).await;
}
let ctx = EngineContext {};
let table_name = req.table_name.clone();
let table_ref = TableReference {
@@ -71,6 +78,33 @@ impl SqlHandler {
Ok(Output::AffectedRows(0))
}
pub(crate) async fn alter_table_by_procedure(
&self,
procedure_manager: &ProcedureManagerRef,
req: AlterTableRequest,
) -> Result<Output> {
let table_name = req.table_name.clone();
let procedure = AlterTableProcedure::new(
req,
self.catalog_manager.clone(),
self.engine_procedure.clone(),
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Alter table {} by procedure {}", table_name, procedure_id);
let mut watcher = procedure_manager
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu { procedure_id })?;
watcher::wait(&mut watcher)
.await
.context(error::WaitProcedureSnafu { procedure_id })?;
Ok(Output::AffectedRows(0))
}
pub(crate) fn alter_to_request(
&self,
alter_table: AlterTable,
@@ -112,12 +146,14 @@ mod tests {
use std::assert_matches::assert_matches;
use datatypes::prelude::ConcreteDataType;
use query::parser::QueryLanguageParser;
use session::context::QueryContext;
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use super::*;
use crate::tests::test_util::create_mock_sql_handler;
use crate::tests::test_util::{create_mock_sql_handler, MockInstance};
fn parse_sql(sql: &str) -> AlterTable {
let mut stmt = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap();
@@ -182,4 +218,35 @@ mod tests {
_ => unreachable!(),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn alter_table_by_procedure() {
let instance = MockInstance::with_procedure_enabled("alter_table_by_procedure").await;
// Create table first.
let sql = r#"create table test_alter(
host string,
ts timestamp,
cpu double default 0,
TIME INDEX (ts),
PRIMARY KEY(host)
) engine=mito with(regions=1);"#;
let stmt = QueryLanguageParser::parse_sql(sql).unwrap();
let output = instance
.inner()
.execute_stmt(stmt, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
// Alter table.
let sql = r#"alter table test_alter add column memory double"#;
let stmt = QueryLanguageParser::parse_sql(sql).unwrap();
let output = instance
.inner()
.execute_stmt(stmt, QueryContext::arc())
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(0)));
}
}

View File

@@ -17,6 +17,7 @@ snafu.workspace = true
table = { path = "../table" }
[dev-dependencies]
common-catalog = { path = "../common/catalog" }
common-test-util = { path = "../common/test-util" }
log-store = { path = "../log-store" }
mito = { path = "../mito" }

View File

@@ -0,0 +1,343 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Procedure to alter a table.
use async_trait::async_trait;
use catalog::{CatalogManagerRef, RenameTableRequest};
use common_procedure::{
Context, Error, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureState,
ProcedureWithId, Result, Status,
};
use common_telemetry::logging;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineProcedureRef, TableReference};
use table::metadata::TableId;
use table::requests::{AlterKind, AlterTableRequest};
use crate::error::{
AccessCatalogSnafu, CatalogNotFoundSnafu, DeserializeProcedureSnafu, SchemaNotFoundSnafu,
SerializeProcedureSnafu, SubprocedureFailedSnafu, TableExistsSnafu, TableNotFoundSnafu,
};
pub struct AlterTableProcedure {
data: AlterTableData,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
}
#[async_trait]
impl Procedure for AlterTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, ctx: &Context) -> Result<Status> {
match self.data.state {
AlterTableState::Prepare => self.on_prepare().await,
AlterTableState::EngineAlterTable => self.on_engine_alter_table(ctx).await,
AlterTableState::RenameInCatalog => self.on_rename_in_catalog().await,
}
}
fn dump(&self) -> Result<String> {
let json = serde_json::to_string(&self.data).context(SerializeProcedureSnafu)?;
Ok(json)
}
fn lock_key(&self) -> LockKey {
// We lock the whole table.
let table_name = self.data.table_ref().to_string();
// If alter kind is rename, we also need to lock the renamed table.
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
let new_table_name = TableReference {
catalog: &self.data.request.catalog_name,
schema: &self.data.request.schema_name,
table: new_table_name,
}
.to_string();
LockKey::new([table_name, new_table_name])
} else {
LockKey::single(table_name)
}
}
}
impl AlterTableProcedure {
const TYPE_NAME: &str = "table-procedure:AlterTableProcedure";
/// Returns a new [AlterTableProcedure].
pub fn new(
request: AlterTableRequest,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
) -> AlterTableProcedure {
AlterTableProcedure {
data: AlterTableData {
state: AlterTableState::Prepare,
request,
subprocedure_id: None,
table_id: None,
},
catalog_manager,
engine_procedure,
}
}
/// Register the loader of this procedure to the `procedure_manager`.
///
/// # Panics
/// Panics on error.
pub fn register_loader(
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
procedure_manager: &dyn ProcedureManager,
) {
procedure_manager
.register_loader(
Self::TYPE_NAME,
Box::new(move |data| {
Self::from_json(data, catalog_manager.clone(), engine_procedure.clone())
.map(|p| Box::new(p) as _)
}),
)
.unwrap()
}
/// Recover the procedure from json.
fn from_json(
json: &str,
catalog_manager: CatalogManagerRef,
engine_procedure: TableEngineProcedureRef,
) -> Result<Self> {
let data: AlterTableData = serde_json::from_str(json).context(DeserializeProcedureSnafu)?;
Ok(AlterTableProcedure {
data,
catalog_manager,
engine_procedure,
})
}
async fn on_prepare(&mut self) -> Result<Status> {
// Check whether catalog and schema exist.
let catalog = self
.catalog_manager
.catalog(&self.data.request.catalog_name)
.context(AccessCatalogSnafu)?
.with_context(|| CatalogNotFoundSnafu {
name: &self.data.request.catalog_name,
})?;
let schema = catalog
.schema(&self.data.request.schema_name)
.context(AccessCatalogSnafu)?
.with_context(|| SchemaNotFoundSnafu {
name: &self.data.request.schema_name,
})?;
let table = schema
.table(&self.data.request.table_name)
.await
.context(AccessCatalogSnafu)?
.context(TableNotFoundSnafu {
name: &self.data.request.table_name,
})?;
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
ensure!(
!schema
.table_exist(new_table_name)
.context(AccessCatalogSnafu)?,
TableExistsSnafu {
name: new_table_name,
}
);
}
self.data.state = AlterTableState::EngineAlterTable;
// Assign procedure id to the subprocedure.
self.data.subprocedure_id = Some(ProcedureId::random());
// Set the table id.
self.data.table_id = Some(table.table_info().ident.table_id);
Ok(Status::executing(true))
}
async fn on_engine_alter_table(&mut self, ctx: &Context) -> Result<Status> {
// Safety: subprocedure id is always set in this state.
let sub_id = self.data.subprocedure_id.unwrap();
// Query subprocedure state.
let Some(sub_state) = ctx.provider.procedure_state(sub_id).await? else {
logging::info!(
"On engine alter table {}, subprocedure not found, sub_id: {}",
self.data.request.table_name,
sub_id
);
// If the subprocedure is not found, we create a new subprocedure with the same id.
let engine_ctx = EngineContext::default();
let procedure = self
.engine_procedure
.alter_table_procedure(&engine_ctx, self.data.request.clone())
.map_err(Error::from_error_ext)?;
return Ok(Status::Suspended {
subprocedures: vec![ProcedureWithId {
id: sub_id,
procedure,
}],
persist: true,
});
};
match sub_state {
ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended {
subprocedures: Vec::new(),
persist: false,
}),
ProcedureState::Done => {
logging::info!(
"On engine alter table {}, done, sub_id: {}",
self.data.request.table_name,
sub_id
);
// The sub procedure is done, we can execute next step.
if self.data.request.is_rename_table() {
// We also need to rename the table in the catalog.
self.data.state = AlterTableState::RenameInCatalog;
Ok(Status::executing(true))
} else {
// If this isn't a rename operation, we are done.
Ok(Status::Done)
}
}
ProcedureState::Failed { .. } => {
// Return error if the subprocedure is failed.
SubprocedureFailedSnafu {
subprocedure_id: sub_id,
}
.fail()?
}
}
}
async fn on_rename_in_catalog(&mut self) -> Result<Status> {
// Safety: table id is available in this state.
let table_id = self.data.table_id.unwrap();
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
let rename_req = RenameTableRequest {
catalog: self.data.request.catalog_name.clone(),
schema: self.data.request.schema_name.clone(),
table_name: self.data.request.table_name.clone(),
new_table_name: new_table_name.clone(),
table_id,
};
self.catalog_manager
.rename_table(rename_req)
.await
.map_err(Error::from_error_ext)?;
}
Ok(Status::Done)
}
}
/// Represents each step while altering a table in the datanode.
#[derive(Debug, Serialize, Deserialize)]
enum AlterTableState {
/// Validate request and prepare to alter table.
Prepare,
/// Alter table in the table engine.
EngineAlterTable,
/// Rename the table in the catalog (optional).
RenameInCatalog,
}
/// Serializable data of [AlterTableProcedure].
#[derive(Debug, Serialize, Deserialize)]
struct AlterTableData {
/// Current state.
state: AlterTableState,
/// Request to alter this table.
request: AlterTableRequest,
/// Id of the table.
///
/// Available after [AlterTableState::Prepare] state.
table_id: Option<TableId>,
/// Id of the subprocedure to create this table in the engine.
///
/// This id is `Some` while the procedure is in [AlterTableState::EngineAlterTable]
/// state.
subprocedure_id: Option<ProcedureId>,
}
impl AlterTableData {
fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.request.catalog_name,
schema: &self.request.schema_name,
table: &self.request.table_name,
}
}
}
#[cfg(test)]
mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use super::*;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_alter_table_procedure_rename() {
let env = TestEnv::new("rename");
let table_name = "test_old";
env.create_table(table_name).await;
let new_table_name = "test_new";
let request = AlterTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
alter_kind: AlterKind::RenameTable {
new_table_name: new_table_name.to_string(),
},
};
let TestEnv {
dir: _dir,
table_engine,
procedure_manager,
catalog_manager,
} = env;
let procedure =
AlterTableProcedure::new(request, catalog_manager.clone(), table_engine.clone());
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let mut watcher = procedure_manager.submit(procedure_with_id).await.unwrap();
watcher.changed().await.unwrap();
let catalog = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.unwrap();
let schema = catalog.schema(DEFAULT_SCHEMA_NAME).unwrap().unwrap();
let table = schema.table(new_table_name).await.unwrap().unwrap();
let table_info = table.table_info();
assert_eq!(new_table_name, table_info.name);
assert!(schema.table(table_name).await.unwrap().is_none());
}
}

View File

@@ -66,7 +66,7 @@ impl Procedure for CreateTableProcedure {
}
impl CreateTableProcedure {
const TYPE_NAME: &str = "table-procedures::CreateTableProcedure";
const TYPE_NAME: &str = "table-procedure::CreateTableProcedure";
/// Returns a new [CreateTableProcedure].
pub fn new(
@@ -176,7 +176,7 @@ impl CreateTableProcedure {
// do this check as we might not submitted the subprocedure yet when the manager
// recover this procedure from procedure store.
logging::info!(
"On engine create table {}, not found, sub_id: {}",
"On engine create table {}, subprocedure not found, sub_id: {}",
self.data.request.table_name,
sub_id
);
@@ -186,7 +186,7 @@ impl CreateTableProcedure {
let procedure = self
.engine_procedure
.create_table_procedure(&engine_ctx, self.data.request.clone())
.map_err(Error::external)?;
.map_err(Error::from_error_ext)?;
return Ok(Status::Suspended {
subprocedures: vec![ProcedureWithId {
id: sub_id,
@@ -238,7 +238,7 @@ impl CreateTableProcedure {
let table_exists = schema
.table(&self.data.request.table_name)
.await
.map_err(Error::external)?
.map_err(Error::from_error_ext)?
.is_some();
if table_exists {
// Table already exists.
@@ -251,7 +251,7 @@ impl CreateTableProcedure {
let table = self
.table_engine
.get_table(&engine_ctx, &table_ref)
.map_err(Error::external)?
.map_err(Error::from_error_ext)?
.unwrap();
let register_req = RegisterTableRequest {
@@ -264,7 +264,7 @@ impl CreateTableProcedure {
self.catalog_manager
.register_table(register_req)
.await
.map_err(Error::external)?;
.map_err(Error::from_error_ext)?;
Ok(Status::Done)
}
@@ -307,48 +307,10 @@ impl CreateTableData {
#[cfg(test)]
mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use mito::engine::MITO_ENGINE;
use table::engine::{EngineContext, TableEngine};
use super::*;
use crate::test_util::TestEnv;
fn schema_for_test() -> RawSchema {
let column_schemas = vec![
// Key
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
// Nullable value column: cpu
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
// Non-null value column: memory
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), false),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true),
];
RawSchema::new(column_schemas)
}
fn new_create_request(table_name: &str) -> CreateTableRequest {
CreateTableRequest {
id: 1,
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: table_name.to_string(),
desc: Some("a test table".to_string()),
schema: schema_for_test(),
region_numbers: vec![0, 1],
create_if_not_exists: true,
primary_key_indices: vec![0],
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
}
}
use crate::test_util::{self, TestEnv};
#[tokio::test]
async fn test_create_table_procedure() {
@@ -360,7 +322,7 @@ mod tests {
} = TestEnv::new("create");
let table_name = "test_create";
let request = new_create_request(table_name);
let request = test_util::new_create_request(table_name);
let procedure = CreateTableProcedure::new(
request.clone(),
catalog_manager,

View File

@@ -51,11 +51,17 @@ pub enum Error {
#[snafu(display("Schema {} not found", name))]
SchemaNotFound { name: String },
#[snafu(display("Table {} not found", name))]
TableNotFound { name: String },
#[snafu(display("Subprocedure {} failed", subprocedure_id))]
SubprocedureFailed {
subprocedure_id: ProcedureId,
location: Location,
},
#[snafu(display("Table {} already exists", name))]
TableExists { name: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -70,7 +76,10 @@ impl ErrorExt for Error {
}
InvalidRawSchema { source, .. } => source.status_code(),
AccessCatalog { source } => source.status_code(),
CatalogNotFound { .. } | SchemaNotFound { .. } => StatusCode::InvalidArguments,
CatalogNotFound { .. } | SchemaNotFound { .. } | TableExists { .. } => {
StatusCode::InvalidArguments
}
TableNotFound { .. } => StatusCode::TableNotFound,
}
}

View File

@@ -14,11 +14,13 @@
//! Procedures for table operations.
mod alter;
mod create;
pub mod error;
#[cfg(test)]
mod test_util;
pub use alter::AlterTableProcedure;
use catalog::CatalogManagerRef;
use common_procedure::ProcedureManager;
pub use create::CreateTableProcedure;
@@ -35,9 +37,10 @@ pub fn register_procedure_loaders(
procedure_manager: &dyn ProcedureManager,
) {
CreateTableProcedure::register_loader(
catalog_manager,
engine_procedure,
catalog_manager.clone(),
engine_procedure.clone(),
table_engine,
procedure_manager,
);
AlterTableProcedure::register_loader(catalog_manager, engine_procedure, procedure_manager);
}

View File

@@ -17,18 +17,24 @@ use std::time::Duration;
use catalog::local::MemoryCatalogManager;
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::store::state_store::ObjectStateStore;
use common_procedure::ProcedureManagerRef;
use common_procedure::{ProcedureManagerRef, ProcedureWithId};
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use log_store::NoopLogStore;
use mito::config::EngineConfig;
use mito::engine::MitoEngine;
use mito::engine::{MitoEngine, MITO_ENGINE};
use object_store::services::Fs;
use object_store::ObjectStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::requests::CreateTableRequest;
use crate::CreateTableProcedure;
pub struct TestEnv {
pub dir: TempDir,
@@ -79,4 +85,57 @@ impl TestEnv {
catalog_manager,
}
}
pub async fn create_table(&self, table_name: &str) {
let request = new_create_request(table_name);
let procedure = CreateTableProcedure::new(
request,
self.catalog_manager.clone(),
self.table_engine.clone(),
self.table_engine.clone(),
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let mut watcher = self
.procedure_manager
.submit(procedure_with_id)
.await
.unwrap();
watcher.changed().await.unwrap();
}
}
pub fn schema_for_test() -> RawSchema {
let column_schemas = vec![
// Key
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
// Nullable value column: cpu
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
// Non-null value column: memory
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), false),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true),
];
RawSchema::new(column_schemas)
}
pub fn new_create_request(table_name: &str) -> CreateTableRequest {
CreateTableRequest {
id: 1,
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
desc: Some("a test table".to_string()),
schema: schema_for_test(),
region_numbers: vec![0, 1],
create_if_not_exists: true,
primary_key_indices: vec![0],
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
}
}