From dee20144d7d5f24fe4aa4f0156a693edff45cdb4 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 31 Mar 2023 14:40:54 +0800 Subject: [PATCH] feat: Implement procedure to alter a table for mito engine (#1259) * feat: wip * fix: Fix CreateMitoTable::table_schema not initialized from json * feat: Implement AlterMitoTable procedure * test: Add test for alter procedure * feat: Register alter procedure * fix: Recover procedures after catalog manager is started * feat: Simplify usage of table schema in create table procedure * test: Add rename test * test: Add drop columns test --- src/datanode/src/instance.rs | 20 +- src/mito/src/engine.rs | 35 +- src/mito/src/engine/procedure.rs | 5 +- src/mito/src/engine/procedure/alter.rs | 540 ++++++++++++++++++++++++ src/mito/src/engine/procedure/create.rs | 30 +- src/mito/src/error.rs | 11 +- src/mito/src/table.rs | 2 +- src/mito/src/table/test_util.rs | 13 +- src/table/src/engine.rs | 7 + src/table/src/requests.rs | 6 +- 10 files changed, 637 insertions(+), 32 deletions(-) create mode 100644 src/mito/src/engine/procedure/alter.rs diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index ae2495dd58..7e55add6eb 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -74,6 +74,7 @@ pub struct Instance { pub(crate) script_executor: ScriptExecutor, pub(crate) table_id_provider: Option, pub(crate) heartbeat_task: Option, + procedure_manager: Option, } pub type InstanceRef = Arc; @@ -183,6 +184,7 @@ impl Instance { }; let procedure_manager = create_procedure_manager(&opts.procedure).await?; + // Register all procedures. if let Some(procedure_manager) = &procedure_manager { table_engine.register_procedure_loaders(&**procedure_manager); table_procedure::register_procedure_loaders( @@ -191,12 +193,6 @@ impl Instance { table_engine.clone(), &**procedure_manager, ); - - // Recover procedures. - procedure_manager - .recover() - .await - .context(RecoverProcedureSnafu)?; } Ok(Self { @@ -205,12 +201,13 @@ impl Instance { table_engine.clone(), catalog_manager.clone(), table_engine, - procedure_manager, + procedure_manager.clone(), ), catalog_manager, script_executor, heartbeat_task, table_id_provider, + procedure_manager, }) } @@ -222,6 +219,15 @@ impl Instance { if let Some(task) = &self.heartbeat_task { task.start().await?; } + + // Recover procedures after the catalog manager is started, so we can + // ensure we can access all tables from the catalog manager. + if let Some(procedure_manager) = &self.procedure_manager { + procedure_manager + .recover() + .await + .context(RecoverProcedureSnafu)?; + } Ok(()) } diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index b83d43f091..47f6f16a50 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -47,7 +47,7 @@ use table::{error as table_error, Result as TableResult, Table}; use tokio::sync::Mutex; use crate::config::EngineConfig; -use crate::engine::procedure::CreateMitoTable; +use crate::engine::procedure::{AlterMitoTable, CreateMitoTable}; use crate::error::{ self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu, BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, InvalidRawSchemaSnafu, @@ -166,7 +166,24 @@ impl TableEngineProcedure for MitoEngine { .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; - let procedure = Box::new(CreateMitoTable::new(request, self.inner.clone())); + let procedure = Box::new( + CreateMitoTable::new(request, self.inner.clone()) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?, + ); + Ok(procedure) + } + + fn alter_table_procedure( + &self, + _ctx: &EngineContext, + request: AlterTableRequest, + ) -> TableResult { + let procedure = Box::new( + AlterMitoTable::new(request, self.inner.clone()) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?, + ); Ok(procedure) } } @@ -175,7 +192,7 @@ pub(crate) struct MitoEngineInner { /// All tables opened by the engine. Map key is formatted [TableReference]. /// /// Writing to `tables` should also hold the `table_mutex`. - tables: RwLock>, + tables: RwLock>>>, object_store: ObjectStore, storage_engine: S, /// Table mutex is used to protect the operations such as creating/opening/closing @@ -546,6 +563,16 @@ impl MitoEngineInner { } fn get_table(&self, table_ref: &TableReference) -> Option { + self.tables + .read() + .unwrap() + .get(&table_ref.to_string()) + .cloned() + .map(|table| table as _) + } + + /// Returns the [MitoTable]. + fn get_mito_table(&self, table_ref: &TableReference) -> Option>> { self.tables .read() .unwrap() @@ -579,7 +606,7 @@ impl MitoEngineInner { table: table_name, }; let table = self - .get_table(&table_ref) + .get_mito_table(&table_ref) .context(error::TableNotFoundSnafu { table_name })?; logging::info!("start altering table {} with request {:?}", table_name, req); diff --git a/src/mito/src/engine/procedure.rs b/src/mito/src/engine/procedure.rs index 2985f3c4cb..353c63e824 100644 --- a/src/mito/src/engine/procedure.rs +++ b/src/mito/src/engine/procedure.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod alter; mod create; use std::sync::Arc; +pub(crate) use alter::AlterMitoTable; use common_procedure::ProcedureManager; pub(crate) use create::CreateMitoTable; use store_api::storage::StorageEngine; @@ -31,7 +33,8 @@ pub(crate) fn register_procedure_loaders( procedure_manager: &dyn ProcedureManager, ) { // The procedure names are expected to be unique, so we just panic on error. - CreateMitoTable::register_loader(engine_inner, procedure_manager); + CreateMitoTable::register_loader(engine_inner.clone(), procedure_manager); + AlterMitoTable::register_loader(engine_inner, procedure_manager); } #[cfg(test)] diff --git a/src/mito/src/engine/procedure/alter.rs b/src/mito/src/engine/procedure/alter.rs new file mode 100644 index 0000000000..6f935e8e21 --- /dev/null +++ b/src/mito/src/engine/procedure/alter.rs @@ -0,0 +1,540 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use common_procedure::error::{Error, FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, Status}; +use common_telemetry::logging; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::manifest::Manifest; +use store_api::storage::{AlterRequest, Region, RegionMeta, StorageEngine}; +use table::engine::TableReference; +use table::metadata::{RawTableInfo, TableInfo, TableVersion}; +use table::requests::{AlterKind, AlterTableRequest}; +use table::Table; + +use crate::engine::MitoEngineInner; +use crate::error::{ + BuildTableMetaSnafu, TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu, +}; +use crate::manifest::action::{TableChange, TableMetaAction, TableMetaActionList}; +use crate::table::{create_alter_operation, MitoTable}; + +/// Procedure to alter a [MitoTable]. +pub(crate) struct AlterMitoTable { + data: AlterTableData, + engine_inner: Arc>, + table: Arc>, + /// The table info after alteration. + new_info: Option, +} + +#[async_trait] +impl Procedure for AlterMitoTable { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &Context) -> Result { + match self.data.state { + AlterTableState::Prepare => self.on_prepare(), + AlterTableState::AlterRegions => self.on_alter_regions().await, + AlterTableState::UpdateTableManifest => self.on_update_table_manifest().await, + } + } + + fn dump(&self) -> Result { + let json = serde_json::to_string(&self.data).context(ToJsonSnafu)?; + Ok(json) + } + + fn lock_key(&self) -> LockKey { + let table_ref = self.data.table_ref(); + let info = self.table.table_info(); + let mut keys: Vec<_> = info + .meta + .region_numbers + .iter() + .map(|number| format!("{table_ref}/region-{number}")) + .collect(); + // If alter kind is rename, we also need to lock the region with another name. + if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { + let new_table_ref = TableReference { + catalog: &self.data.request.catalog_name, + schema: &self.data.request.schema_name, + table: new_table_name, + }; + // We only acquire the first region. + keys.push(format!("{new_table_ref}/region-0")); + } + LockKey::new(keys) + } +} + +impl AlterMitoTable { + const TYPE_NAME: &str = "mito::AlterMitoTable"; + + /// Returns a new [AlterMitoTable]. + pub(crate) fn new( + request: AlterTableRequest, + engine_inner: Arc>, + ) -> Result { + let mut data = AlterTableData { + state: AlterTableState::Prepare, + request, + // We set table version later. + table_version: 0, + }; + let table_ref = data.table_ref(); + let table = + engine_inner + .get_mito_table(&table_ref) + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + let info = table.table_info(); + data.table_version = info.ident.version; + + Ok(AlterMitoTable { + data, + engine_inner, + table, + new_info: None, + }) + } + + /// Register the loader of this procedure to the `procedure_manager`. + /// + /// # Panics + /// Panics on error. + pub(crate) fn register_loader( + engine_inner: Arc>, + procedure_manager: &dyn ProcedureManager, + ) { + procedure_manager + .register_loader( + Self::TYPE_NAME, + Box::new(move |data| { + Self::from_json(data, engine_inner.clone()).map(|p| Box::new(p) as _) + }), + ) + .unwrap() + } + + /// Recover the procedure from json. + fn from_json(json: &str, engine_inner: Arc>) -> Result { + let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?; + let table_ref = data.table_ref(); + let table = + engine_inner + .get_mito_table(&table_ref) + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })?; + + Ok(AlterMitoTable { + data, + engine_inner, + table, + new_info: None, + }) + } + + /// Prepare table info. + fn on_prepare(&mut self) -> Result { + let current_info = self.table.table_info(); + ensure!( + current_info.ident.version == self.data.table_version, + VersionChangedSnafu { + expect: self.data.table_version, + actual: current_info.ident.version, + } + ); + + self.init_new_info(¤t_info)?; + self.data.state = AlterTableState::AlterRegions; + + Ok(Status::executing(true)) + } + + /// Alter regions. + async fn on_alter_regions(&mut self) -> Result { + let current_info = self.table.table_info(); + ensure!( + current_info.ident.version == self.data.table_version, + VersionChangedSnafu { + expect: self.data.table_version, + actual: current_info.ident.version, + } + ); + + self.init_new_info(¤t_info)?; + let new_info = self.new_info.as_mut().unwrap(); + let table_name = &self.data.request.table_name; + + let Some(alter_op) = create_alter_operation(table_name, &self.data.request.alter_kind, &mut new_info.meta) + .map_err(Error::from_error_ext)? else { + // Don't need to alter the region. + self.data.state = AlterTableState::UpdateTableManifest; + + return Ok(Status::executing(true)); + }; + + let regions = self.table.regions(); + // For each region, alter it if its version is not updated. + for region in regions.values() { + let region_meta = region.in_memory_metadata(); + if u64::from(region_meta.version()) > self.data.table_version { + // Region is already altered. + continue; + } + + let alter_req = AlterRequest { + operation: alter_op.clone(), + version: region_meta.version(), + }; + // Alter the region. + logging::debug!( + "start altering region {} of table {}, with request {:?}", + region.name(), + table_name, + alter_req, + ); + region + .alter(alter_req) + .await + .map_err(Error::from_error_ext)?; + } + + self.data.state = AlterTableState::UpdateTableManifest; + + Ok(Status::executing(true)) + } + + /// Persist the alteration to the manifest and update table info. + async fn on_update_table_manifest(&mut self) -> Result { + // Get current table info. + let current_info = self.table.table_info(); + if current_info.ident.version > self.data.table_version { + logging::info!( + "table {} version is already updated, current: {}, old_version: {}", + self.data.request.table_name, + current_info.ident.version, + self.data.table_version, + ); + return Ok(Status::Done); + } + + self.init_new_info(¤t_info)?; + let new_info = self.new_info.as_ref().unwrap(); + let table_name = &self.data.request.table_name; + + logging::debug!( + "start updating the manifest of table {} with new table info {:?}", + table_name, + new_info + ); + + self.table + .manifest() + .update(TableMetaActionList::with_action(TableMetaAction::Change( + Box::new(TableChange { + table_info: RawTableInfo::from(new_info.clone()), + }), + ))) + .await + .context(UpdateTableManifestSnafu { table_name })?; + + // Update in memory metadata of the table. + self.table.set_table_info(new_info.clone()); + + // Rename key in tables map. + if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { + let mut table_ref = self.data.table_ref(); + let mut tables = self.engine_inner.tables.write().unwrap(); + tables.remove(&table_ref.to_string()); + table_ref.table = new_table_name.as_str(); + tables.insert(table_ref.to_string(), self.table.clone()); + } + + Ok(Status::Done) + } + + fn init_new_info(&mut self, current_info: &TableInfo) -> Result<()> { + if self.new_info.is_some() { + return Ok(()); + } + + let table_name = ¤t_info.name; + let mut new_info = TableInfo::clone(current_info); + // setup new table info + match &self.data.request.alter_kind { + AlterKind::RenameTable { new_table_name } => { + new_info.name = new_table_name.clone(); + } + AlterKind::AddColumns { .. } | AlterKind::DropColumns { .. } => { + let table_meta = ¤t_info.meta; + let new_meta = table_meta + .builder_with_alter_kind(table_name, &self.data.request.alter_kind) + .map_err(Error::from_error_ext)? + .build() + .context(BuildTableMetaSnafu { table_name })?; + new_info.meta = new_meta; + } + } + // Increase version of the table. + new_info.ident.version = current_info.ident.version + 1; + + self.new_info = Some(new_info); + + Ok(()) + } +} + +/// Represents each step while altering table in the mito engine. +#[derive(Debug, Serialize, Deserialize)] +enum AlterTableState { + /// Prepare to alter table. + Prepare, + /// Alter regions. + AlterRegions, + /// Update table manifest. + UpdateTableManifest, +} + +/// Serializable data of [AlterMitoTable]. +#[derive(Debug, Serialize, Deserialize)] +struct AlterTableData { + state: AlterTableState, + request: AlterTableRequest, + /// Table version before alteration. + table_version: TableVersion, +} + +impl AlterTableData { + fn table_ref(&self) -> TableReference { + TableReference { + catalog: &self.request.catalog_name, + schema: &self.request.schema_name, + table: &self.request.table_name, + } + } +} + +#[cfg(test)] +mod tests { + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use table::engine::{EngineContext, TableEngine, TableEngineProcedure}; + use table::requests::AddColumnRequest; + + use super::*; + use crate::engine::procedure::procedure_test_util::{self, TestEnv}; + use crate::table::test_util; + + fn new_add_columns_req() -> AlterTableRequest { + let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true); + let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true); + let alter_kind = AlterKind::AddColumns { + columns: vec![ + AddColumnRequest { + column_schema: new_tag, + is_key: true, + }, + AddColumnRequest { + column_schema: new_field, + is_key: false, + }, + ], + }; + test_util::new_alter_request(alter_kind) + } + + #[tokio::test] + async fn test_procedure_add_column() { + common_telemetry::init_default_ut_logging(); + + let TestEnv { + table_engine, + dir: _dir, + } = procedure_test_util::setup_test_engine("create_procedure").await; + let schema = Arc::new(test_util::schema_for_test()); + let request = test_util::new_create_request(schema.clone()); + + let engine_ctx = EngineContext::default(); + // Create table first. + let mut procedure = table_engine + .create_table_procedure(&engine_ctx, request.clone()) + .unwrap(); + procedure_test_util::execute_procedure_until_done(&mut procedure).await; + + // Get metadata of the created table. + let table_ref = TableReference { + catalog: &request.catalog_name, + schema: &request.schema_name, + table: &request.table_name, + }; + let table = table_engine + .get_table(&engine_ctx, &table_ref) + .unwrap() + .unwrap(); + let old_info = table.table_info(); + let old_meta = &old_info.meta; + + // Alter the table. + let request = new_add_columns_req(); + let mut procedure = table_engine + .alter_table_procedure(&engine_ctx, request.clone()) + .unwrap(); + procedure_test_util::execute_procedure_until_done(&mut procedure).await; + + // Validate. + let table = table_engine + .get_table(&engine_ctx, &table_ref) + .unwrap() + .unwrap(); + let new_info = table.table_info(); + let new_meta = &new_info.meta; + let new_schema = &new_meta.schema; + + assert_eq!(&[0, 4], &new_meta.primary_key_indices[..]); + assert_eq!(&[1, 2, 3, 5], &new_meta.value_indices[..]); + assert!(new_schema.column_schema_by_name("my_tag").is_some()); + assert!(new_schema.column_schema_by_name("my_field").is_some()); + assert_eq!(new_schema.version(), schema.version() + 1); + assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 2); + } + + #[tokio::test] + async fn test_procedure_drop_column() { + common_telemetry::init_default_ut_logging(); + + let TestEnv { + table_engine, + dir: _dir, + } = procedure_test_util::setup_test_engine("create_procedure").await; + let schema = Arc::new(test_util::schema_for_test()); + let request = test_util::new_create_request(schema.clone()); + + let engine_ctx = EngineContext::default(); + // Create table first. + let mut procedure = table_engine + .create_table_procedure(&engine_ctx, request.clone()) + .unwrap(); + procedure_test_util::execute_procedure_until_done(&mut procedure).await; + + // Add columns. + let request = new_add_columns_req(); + let mut procedure = table_engine + .alter_table_procedure(&engine_ctx, request.clone()) + .unwrap(); + procedure_test_util::execute_procedure_until_done(&mut procedure).await; + + // Get metadata. + let table_ref = TableReference { + catalog: &request.catalog_name, + schema: &request.schema_name, + table: &request.table_name, + }; + let table = table_engine + .get_table(&engine_ctx, &table_ref) + .unwrap() + .unwrap(); + let old_info = table.table_info(); + let old_meta = &old_info.meta; + + // Then remove memory and my_field from the table. + let alter_kind = AlterKind::DropColumns { + names: vec![String::from("memory"), String::from("my_field")], + }; + let request = test_util::new_alter_request(alter_kind); + let mut procedure = table_engine + .alter_table_procedure(&engine_ctx, request.clone()) + .unwrap(); + procedure_test_util::execute_procedure_until_done(&mut procedure).await; + + // Validate. + let new_info = table.table_info(); + let new_meta = &new_info.meta; + let new_schema = &new_meta.schema; + + let remaining_names: Vec = new_schema + .column_schemas() + .iter() + .map(|column_schema| column_schema.name.clone()) + .collect(); + assert_eq!(&["host", "cpu", "ts", "my_tag"], &remaining_names[..]); + assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]); + assert_eq!(&[1, 2], &new_meta.value_indices[..]); + assert_eq!(new_schema.version(), old_meta.schema.version() + 1); + assert_eq!(new_meta.region_numbers, old_meta.region_numbers); + } + + #[tokio::test] + async fn test_procedure_rename_table() { + common_telemetry::init_default_ut_logging(); + + let TestEnv { + table_engine, + dir: _dir, + } = procedure_test_util::setup_test_engine("create_procedure").await; + let schema = Arc::new(test_util::schema_for_test()); + let create_request = test_util::new_create_request(schema.clone()); + + let engine_ctx = EngineContext::default(); + // Create table first. + let mut procedure = table_engine + .create_table_procedure(&engine_ctx, create_request.clone()) + .unwrap(); + procedure_test_util::execute_procedure_until_done(&mut procedure).await; + + // Get metadata of the created table. + let mut table_ref = TableReference { + catalog: &create_request.catalog_name, + schema: &create_request.schema_name, + table: &create_request.table_name, + }; + let table = table_engine + .get_table(&engine_ctx, &table_ref) + .unwrap() + .unwrap(); + + // Rename the table. + let new_name = "another_table".to_string(); + let alter_kind = AlterKind::RenameTable { + new_table_name: new_name.clone(), + }; + let alter_request = test_util::new_alter_request(alter_kind); + let mut procedure = table_engine + .alter_table_procedure(&engine_ctx, alter_request.clone()) + .unwrap(); + procedure_test_util::execute_procedure_until_done(&mut procedure).await; + + // Validate. + let info = table.table_info(); + assert_eq!(new_name, info.name); + assert!(table_engine + .get_table(&engine_ctx, &table_ref) + .unwrap() + .is_none()); + table_ref.table = &new_name; + assert!(table_engine + .get_table(&engine_ctx, &table_ref) + .unwrap() + .is_some()); + } +} diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index ee0827ae3e..174a45ef88 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -43,7 +43,7 @@ pub(crate) struct CreateMitoTable { /// Created regions of the table. regions: HashMap, /// Schema of the table. - table_schema: Option, + table_schema: SchemaRef, } #[async_trait] @@ -81,8 +81,14 @@ impl CreateMitoTable { const TYPE_NAME: &str = "mito::CreateMitoTable"; /// Returns a new [CreateMitoTable]. - pub(crate) fn new(request: CreateTableRequest, engine_inner: Arc>) -> Self { - CreateMitoTable { + pub(crate) fn new( + request: CreateTableRequest, + engine_inner: Arc>, + ) -> Result { + let table_schema = + Schema::try_from(request.schema.clone()).context(InvalidRawSchemaSnafu)?; + + Ok(CreateMitoTable { data: CreateTableData { state: CreateTableState::Prepare, request, @@ -90,8 +96,8 @@ impl CreateMitoTable { }, engine_inner, regions: HashMap::new(), - table_schema: None, - } + table_schema: Arc::new(table_schema), + }) } /// Register the loader of this procedure to the `procedure_manager`. @@ -115,12 +121,14 @@ impl CreateMitoTable { /// Recover the procedure from json. fn from_json(json: &str, engine_inner: Arc>) -> Result { let data: CreateTableData = serde_json::from_str(json).context(FromJsonSnafu)?; + let table_schema = + Schema::try_from(data.request.schema.clone()).context(InvalidRawSchemaSnafu)?; Ok(CreateMitoTable { data, engine_inner, regions: HashMap::new(), - table_schema: None, + table_schema: Arc::new(table_schema), }) } @@ -166,19 +174,17 @@ impl CreateMitoTable { ttl, }; - let table_schema = - Schema::try_from(self.data.request.schema.clone()).context(InvalidRawSchemaSnafu)?; let primary_key_indices = &self.data.request.primary_key_indices; let (next_column_id, default_cf) = engine::build_column_family( engine::INIT_COLUMN_ID, &self.data.request.table_name, - &table_schema, + &self.table_schema, primary_key_indices, )?; let (next_column_id, row_key) = engine::build_row_key_desc( next_column_id, &self.data.request.table_name, - &table_schema, + &self.table_schema, primary_key_indices, )?; self.data.next_column_id = Some(next_column_id); @@ -228,7 +234,6 @@ impl CreateMitoTable { // All regions are created, moves to the next step. self.data.state = CreateTableState::WriteTableManifest; - self.table_schema = Some(Arc::new(table_schema)); Ok(Status::executing(true)) } @@ -276,10 +281,9 @@ impl CreateMitoTable { ) -> Result> { // Safety: We are in `WriteTableManifest` state. let next_column_id = self.data.next_column_id.unwrap(); - let table_schema = self.table_schema.clone().unwrap(); let table_meta = TableMetaBuilder::default() - .schema(table_schema) + .schema(self.table_schema.clone()) .engine(engine::MITO_ENGINE) .next_column_id(next_column_id) .primary_key_indices(self.data.request.primary_key_indices.clone()) diff --git a/src/mito/src/error.rs b/src/mito/src/error.rs index 55446db42a..a87b385206 100644 --- a/src/mito/src/error.rs +++ b/src/mito/src/error.rs @@ -17,7 +17,7 @@ use std::any::Any; use common_error::ext::BoxedError; use common_error::prelude::*; use store_api::storage::RegionNumber; -use table::metadata::{TableInfoBuilderError, TableMetaBuilderError}; +use table::metadata::{TableInfoBuilderError, TableMetaBuilderError, TableVersion}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -187,6 +187,12 @@ pub enum Error { #[snafu(display("Invalid schema, source: {}", source))] InvalidRawSchema { source: datatypes::error::Error }, + + #[snafu(display("Table version changed, expect: {}, actual: {}", expect, actual))] + VersionChanged { + expect: TableVersion, + actual: TableVersion, + }, } pub type Result = std::result::Result; @@ -211,7 +217,8 @@ impl ErrorExt for Error { | InvalidPrimaryKey { .. } | MissingTimestampIndex { .. } | TableNotFound { .. } - | InvalidRawSchema { .. } => StatusCode::InvalidArguments, + | InvalidRawSchema { .. } + | VersionChanged { .. } => StatusCode::InvalidArguments, TableInfoNotFound { .. } | ConvertRaw { .. } => StatusCode::Unexpected, diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index d72109f0b1..5b2e80fbd1 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -558,7 +558,7 @@ impl MitoTable { } /// Create [`AlterOperation`] according to given `alter_kind`. -fn create_alter_operation( +pub(crate) fn create_alter_operation( table_name: &str, alter_kind: &AlterKind, table_meta: &mut TableMeta, diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index dcd5445711..0da99944a2 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -29,7 +29,9 @@ use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::engine::{EngineContext, TableEngine}; use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType}; -use table::requests::{CreateTableRequest, InsertRequest, TableOptions}; +use table::requests::{ + AlterKind, AlterTableRequest, CreateTableRequest, InsertRequest, TableOptions, +}; use table::{Table, TableRef}; use crate::config::EngineConfig; @@ -118,6 +120,15 @@ pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest { } } +pub fn new_alter_request(alter_kind: AlterKind) -> AlterTableRequest { + AlterTableRequest { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: TABLE_NAME.to_string(), + alter_kind, + } +} + pub struct TestEngineComponents { pub table_engine: MitoEngine>, pub storage_engine: EngineImpl, diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index bdebd2600c..c679362cfd 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -121,6 +121,13 @@ pub trait TableEngineProcedure: Send + Sync { ctx: &EngineContext, request: CreateTableRequest, ) -> Result; + + /// Returns a procedure that alters table by specific `request`. + fn alter_table_procedure( + &self, + ctx: &EngineContext, + request: AlterTableRequest, + ) -> Result; } pub type TableEngineProcedureRef = Arc; diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 92a26dde2f..e9f4854c7a 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -136,7 +136,7 @@ pub struct OpenTableRequest { } /// Alter table request -#[derive(Debug)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct AlterTableRequest { pub catalog_name: String, pub schema_name: String, @@ -151,13 +151,13 @@ impl AlterTableRequest { } /// Add column request -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct AddColumnRequest { pub column_schema: ColumnSchema, pub is_key: bool, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum AlterKind { AddColumns { columns: Vec }, DropColumns { names: Vec },