mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 07:12:54 +00:00
feat: Implement procedure to alter a table for mito engine (#1259)
* feat: wip * fix: Fix CreateMitoTable::table_schema not initialized from json * feat: Implement AlterMitoTable procedure * test: Add test for alter procedure * feat: Register alter procedure * fix: Recover procedures after catalog manager is started * feat: Simplify usage of table schema in create table procedure * test: Add rename test * test: Add drop columns test
This commit is contained in:
@@ -74,6 +74,7 @@ pub struct Instance {
|
||||
pub(crate) script_executor: ScriptExecutor,
|
||||
pub(crate) table_id_provider: Option<TableIdProviderRef>,
|
||||
pub(crate) heartbeat_task: Option<HeartbeatTask>,
|
||||
procedure_manager: Option<ProcedureManagerRef>,
|
||||
}
|
||||
|
||||
pub type InstanceRef = Arc<Instance>;
|
||||
@@ -183,6 +184,7 @@ impl Instance {
|
||||
};
|
||||
|
||||
let procedure_manager = create_procedure_manager(&opts.procedure).await?;
|
||||
// Register all procedures.
|
||||
if let Some(procedure_manager) = &procedure_manager {
|
||||
table_engine.register_procedure_loaders(&**procedure_manager);
|
||||
table_procedure::register_procedure_loaders(
|
||||
@@ -191,12 +193,6 @@ impl Instance {
|
||||
table_engine.clone(),
|
||||
&**procedure_manager,
|
||||
);
|
||||
|
||||
// Recover procedures.
|
||||
procedure_manager
|
||||
.recover()
|
||||
.await
|
||||
.context(RecoverProcedureSnafu)?;
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
@@ -205,12 +201,13 @@ impl Instance {
|
||||
table_engine.clone(),
|
||||
catalog_manager.clone(),
|
||||
table_engine,
|
||||
procedure_manager,
|
||||
procedure_manager.clone(),
|
||||
),
|
||||
catalog_manager,
|
||||
script_executor,
|
||||
heartbeat_task,
|
||||
table_id_provider,
|
||||
procedure_manager,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -222,6 +219,15 @@ impl Instance {
|
||||
if let Some(task) = &self.heartbeat_task {
|
||||
task.start().await?;
|
||||
}
|
||||
|
||||
// Recover procedures after the catalog manager is started, so we can
|
||||
// ensure we can access all tables from the catalog manager.
|
||||
if let Some(procedure_manager) = &self.procedure_manager {
|
||||
procedure_manager
|
||||
.recover()
|
||||
.await
|
||||
.context(RecoverProcedureSnafu)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ use table::{error as table_error, Result as TableResult, Table};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
use crate::engine::procedure::CreateMitoTable;
|
||||
use crate::engine::procedure::{AlterMitoTable, CreateMitoTable};
|
||||
use crate::error::{
|
||||
self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu,
|
||||
BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, InvalidRawSchemaSnafu,
|
||||
@@ -166,7 +166,24 @@ impl<S: StorageEngine> TableEngineProcedure for MitoEngine<S> {
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
|
||||
let procedure = Box::new(CreateMitoTable::new(request, self.inner.clone()));
|
||||
let procedure = Box::new(
|
||||
CreateMitoTable::new(request, self.inner.clone())
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?,
|
||||
);
|
||||
Ok(procedure)
|
||||
}
|
||||
|
||||
fn alter_table_procedure(
|
||||
&self,
|
||||
_ctx: &EngineContext,
|
||||
request: AlterTableRequest,
|
||||
) -> TableResult<BoxedProcedure> {
|
||||
let procedure = Box::new(
|
||||
AlterMitoTable::new(request, self.inner.clone())
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?,
|
||||
);
|
||||
Ok(procedure)
|
||||
}
|
||||
}
|
||||
@@ -175,7 +192,7 @@ pub(crate) struct MitoEngineInner<S: StorageEngine> {
|
||||
/// All tables opened by the engine. Map key is formatted [TableReference].
|
||||
///
|
||||
/// Writing to `tables` should also hold the `table_mutex`.
|
||||
tables: RwLock<HashMap<String, TableRef>>,
|
||||
tables: RwLock<HashMap<String, Arc<MitoTable<S::Region>>>>,
|
||||
object_store: ObjectStore,
|
||||
storage_engine: S,
|
||||
/// Table mutex is used to protect the operations such as creating/opening/closing
|
||||
@@ -546,6 +563,16 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
}
|
||||
|
||||
fn get_table(&self, table_ref: &TableReference) -> Option<TableRef> {
|
||||
self.tables
|
||||
.read()
|
||||
.unwrap()
|
||||
.get(&table_ref.to_string())
|
||||
.cloned()
|
||||
.map(|table| table as _)
|
||||
}
|
||||
|
||||
/// Returns the [MitoTable].
|
||||
fn get_mito_table(&self, table_ref: &TableReference) -> Option<Arc<MitoTable<S::Region>>> {
|
||||
self.tables
|
||||
.read()
|
||||
.unwrap()
|
||||
@@ -579,7 +606,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
|
||||
table: table_name,
|
||||
};
|
||||
let table = self
|
||||
.get_table(&table_ref)
|
||||
.get_mito_table(&table_ref)
|
||||
.context(error::TableNotFoundSnafu { table_name })?;
|
||||
|
||||
logging::info!("start altering table {} with request {:?}", table_name, req);
|
||||
|
||||
@@ -12,10 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod alter;
|
||||
mod create;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
pub(crate) use alter::AlterMitoTable;
|
||||
use common_procedure::ProcedureManager;
|
||||
pub(crate) use create::CreateMitoTable;
|
||||
use store_api::storage::StorageEngine;
|
||||
@@ -31,7 +33,8 @@ pub(crate) fn register_procedure_loaders<S: StorageEngine>(
|
||||
procedure_manager: &dyn ProcedureManager,
|
||||
) {
|
||||
// The procedure names are expected to be unique, so we just panic on error.
|
||||
CreateMitoTable::register_loader(engine_inner, procedure_manager);
|
||||
CreateMitoTable::register_loader(engine_inner.clone(), procedure_manager);
|
||||
AlterMitoTable::register_loader(engine_inner, procedure_manager);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
540
src/mito/src/engine/procedure/alter.rs
Normal file
540
src/mito/src/engine/procedure/alter.rs
Normal file
@@ -0,0 +1,540 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_procedure::error::{Error, FromJsonSnafu, ToJsonSnafu};
|
||||
use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, Status};
|
||||
use common_telemetry::logging;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::manifest::Manifest;
|
||||
use store_api::storage::{AlterRequest, Region, RegionMeta, StorageEngine};
|
||||
use table::engine::TableReference;
|
||||
use table::metadata::{RawTableInfo, TableInfo, TableVersion};
|
||||
use table::requests::{AlterKind, AlterTableRequest};
|
||||
use table::Table;
|
||||
|
||||
use crate::engine::MitoEngineInner;
|
||||
use crate::error::{
|
||||
BuildTableMetaSnafu, TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu,
|
||||
};
|
||||
use crate::manifest::action::{TableChange, TableMetaAction, TableMetaActionList};
|
||||
use crate::table::{create_alter_operation, MitoTable};
|
||||
|
||||
/// Procedure to alter a [MitoTable].
|
||||
pub(crate) struct AlterMitoTable<S: StorageEngine> {
|
||||
data: AlterTableData,
|
||||
engine_inner: Arc<MitoEngineInner<S>>,
|
||||
table: Arc<MitoTable<S::Region>>,
|
||||
/// The table info after alteration.
|
||||
new_info: Option<TableInfo>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<S: StorageEngine> Procedure for AlterMitoTable<S> {
|
||||
fn type_name(&self) -> &str {
|
||||
Self::TYPE_NAME
|
||||
}
|
||||
|
||||
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
|
||||
match self.data.state {
|
||||
AlterTableState::Prepare => self.on_prepare(),
|
||||
AlterTableState::AlterRegions => self.on_alter_regions().await,
|
||||
AlterTableState::UpdateTableManifest => self.on_update_table_manifest().await,
|
||||
}
|
||||
}
|
||||
|
||||
fn dump(&self) -> Result<String> {
|
||||
let json = serde_json::to_string(&self.data).context(ToJsonSnafu)?;
|
||||
Ok(json)
|
||||
}
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
let table_ref = self.data.table_ref();
|
||||
let info = self.table.table_info();
|
||||
let mut keys: Vec<_> = info
|
||||
.meta
|
||||
.region_numbers
|
||||
.iter()
|
||||
.map(|number| format!("{table_ref}/region-{number}"))
|
||||
.collect();
|
||||
// If alter kind is rename, we also need to lock the region with another name.
|
||||
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
|
||||
let new_table_ref = TableReference {
|
||||
catalog: &self.data.request.catalog_name,
|
||||
schema: &self.data.request.schema_name,
|
||||
table: new_table_name,
|
||||
};
|
||||
// We only acquire the first region.
|
||||
keys.push(format!("{new_table_ref}/region-0"));
|
||||
}
|
||||
LockKey::new(keys)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: StorageEngine> AlterMitoTable<S> {
|
||||
const TYPE_NAME: &str = "mito::AlterMitoTable";
|
||||
|
||||
/// Returns a new [AlterMitoTable].
|
||||
pub(crate) fn new(
|
||||
request: AlterTableRequest,
|
||||
engine_inner: Arc<MitoEngineInner<S>>,
|
||||
) -> Result<Self> {
|
||||
let mut data = AlterTableData {
|
||||
state: AlterTableState::Prepare,
|
||||
request,
|
||||
// We set table version later.
|
||||
table_version: 0,
|
||||
};
|
||||
let table_ref = data.table_ref();
|
||||
let table =
|
||||
engine_inner
|
||||
.get_mito_table(&table_ref)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
let info = table.table_info();
|
||||
data.table_version = info.ident.version;
|
||||
|
||||
Ok(AlterMitoTable {
|
||||
data,
|
||||
engine_inner,
|
||||
table,
|
||||
new_info: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Register the loader of this procedure to the `procedure_manager`.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics on error.
|
||||
pub(crate) fn register_loader(
|
||||
engine_inner: Arc<MitoEngineInner<S>>,
|
||||
procedure_manager: &dyn ProcedureManager,
|
||||
) {
|
||||
procedure_manager
|
||||
.register_loader(
|
||||
Self::TYPE_NAME,
|
||||
Box::new(move |data| {
|
||||
Self::from_json(data, engine_inner.clone()).map(|p| Box::new(p) as _)
|
||||
}),
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Recover the procedure from json.
|
||||
fn from_json(json: &str, engine_inner: Arc<MitoEngineInner<S>>) -> Result<Self> {
|
||||
let data: AlterTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
|
||||
let table_ref = data.table_ref();
|
||||
let table =
|
||||
engine_inner
|
||||
.get_mito_table(&table_ref)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
|
||||
Ok(AlterMitoTable {
|
||||
data,
|
||||
engine_inner,
|
||||
table,
|
||||
new_info: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Prepare table info.
|
||||
fn on_prepare(&mut self) -> Result<Status> {
|
||||
let current_info = self.table.table_info();
|
||||
ensure!(
|
||||
current_info.ident.version == self.data.table_version,
|
||||
VersionChangedSnafu {
|
||||
expect: self.data.table_version,
|
||||
actual: current_info.ident.version,
|
||||
}
|
||||
);
|
||||
|
||||
self.init_new_info(¤t_info)?;
|
||||
self.data.state = AlterTableState::AlterRegions;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
/// Alter regions.
|
||||
async fn on_alter_regions(&mut self) -> Result<Status> {
|
||||
let current_info = self.table.table_info();
|
||||
ensure!(
|
||||
current_info.ident.version == self.data.table_version,
|
||||
VersionChangedSnafu {
|
||||
expect: self.data.table_version,
|
||||
actual: current_info.ident.version,
|
||||
}
|
||||
);
|
||||
|
||||
self.init_new_info(¤t_info)?;
|
||||
let new_info = self.new_info.as_mut().unwrap();
|
||||
let table_name = &self.data.request.table_name;
|
||||
|
||||
let Some(alter_op) = create_alter_operation(table_name, &self.data.request.alter_kind, &mut new_info.meta)
|
||||
.map_err(Error::from_error_ext)? else {
|
||||
// Don't need to alter the region.
|
||||
self.data.state = AlterTableState::UpdateTableManifest;
|
||||
|
||||
return Ok(Status::executing(true));
|
||||
};
|
||||
|
||||
let regions = self.table.regions();
|
||||
// For each region, alter it if its version is not updated.
|
||||
for region in regions.values() {
|
||||
let region_meta = region.in_memory_metadata();
|
||||
if u64::from(region_meta.version()) > self.data.table_version {
|
||||
// Region is already altered.
|
||||
continue;
|
||||
}
|
||||
|
||||
let alter_req = AlterRequest {
|
||||
operation: alter_op.clone(),
|
||||
version: region_meta.version(),
|
||||
};
|
||||
// Alter the region.
|
||||
logging::debug!(
|
||||
"start altering region {} of table {}, with request {:?}",
|
||||
region.name(),
|
||||
table_name,
|
||||
alter_req,
|
||||
);
|
||||
region
|
||||
.alter(alter_req)
|
||||
.await
|
||||
.map_err(Error::from_error_ext)?;
|
||||
}
|
||||
|
||||
self.data.state = AlterTableState::UpdateTableManifest;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
/// Persist the alteration to the manifest and update table info.
|
||||
async fn on_update_table_manifest(&mut self) -> Result<Status> {
|
||||
// Get current table info.
|
||||
let current_info = self.table.table_info();
|
||||
if current_info.ident.version > self.data.table_version {
|
||||
logging::info!(
|
||||
"table {} version is already updated, current: {}, old_version: {}",
|
||||
self.data.request.table_name,
|
||||
current_info.ident.version,
|
||||
self.data.table_version,
|
||||
);
|
||||
return Ok(Status::Done);
|
||||
}
|
||||
|
||||
self.init_new_info(¤t_info)?;
|
||||
let new_info = self.new_info.as_ref().unwrap();
|
||||
let table_name = &self.data.request.table_name;
|
||||
|
||||
logging::debug!(
|
||||
"start updating the manifest of table {} with new table info {:?}",
|
||||
table_name,
|
||||
new_info
|
||||
);
|
||||
|
||||
self.table
|
||||
.manifest()
|
||||
.update(TableMetaActionList::with_action(TableMetaAction::Change(
|
||||
Box::new(TableChange {
|
||||
table_info: RawTableInfo::from(new_info.clone()),
|
||||
}),
|
||||
)))
|
||||
.await
|
||||
.context(UpdateTableManifestSnafu { table_name })?;
|
||||
|
||||
// Update in memory metadata of the table.
|
||||
self.table.set_table_info(new_info.clone());
|
||||
|
||||
// Rename key in tables map.
|
||||
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
|
||||
let mut table_ref = self.data.table_ref();
|
||||
let mut tables = self.engine_inner.tables.write().unwrap();
|
||||
tables.remove(&table_ref.to_string());
|
||||
table_ref.table = new_table_name.as_str();
|
||||
tables.insert(table_ref.to_string(), self.table.clone());
|
||||
}
|
||||
|
||||
Ok(Status::Done)
|
||||
}
|
||||
|
||||
fn init_new_info(&mut self, current_info: &TableInfo) -> Result<()> {
|
||||
if self.new_info.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let table_name = ¤t_info.name;
|
||||
let mut new_info = TableInfo::clone(current_info);
|
||||
// setup new table info
|
||||
match &self.data.request.alter_kind {
|
||||
AlterKind::RenameTable { new_table_name } => {
|
||||
new_info.name = new_table_name.clone();
|
||||
}
|
||||
AlterKind::AddColumns { .. } | AlterKind::DropColumns { .. } => {
|
||||
let table_meta = ¤t_info.meta;
|
||||
let new_meta = table_meta
|
||||
.builder_with_alter_kind(table_name, &self.data.request.alter_kind)
|
||||
.map_err(Error::from_error_ext)?
|
||||
.build()
|
||||
.context(BuildTableMetaSnafu { table_name })?;
|
||||
new_info.meta = new_meta;
|
||||
}
|
||||
}
|
||||
// Increase version of the table.
|
||||
new_info.ident.version = current_info.ident.version + 1;
|
||||
|
||||
self.new_info = Some(new_info);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents each step while altering table in the mito engine.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
enum AlterTableState {
|
||||
/// Prepare to alter table.
|
||||
Prepare,
|
||||
/// Alter regions.
|
||||
AlterRegions,
|
||||
/// Update table manifest.
|
||||
UpdateTableManifest,
|
||||
}
|
||||
|
||||
/// Serializable data of [AlterMitoTable].
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct AlterTableData {
|
||||
state: AlterTableState,
|
||||
request: AlterTableRequest,
|
||||
/// Table version before alteration.
|
||||
table_version: TableVersion,
|
||||
}
|
||||
|
||||
impl AlterTableData {
|
||||
fn table_ref(&self) -> TableReference {
|
||||
TableReference {
|
||||
catalog: &self.request.catalog_name,
|
||||
schema: &self.request.schema_name,
|
||||
table: &self.request.table_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use table::engine::{EngineContext, TableEngine, TableEngineProcedure};
|
||||
use table::requests::AddColumnRequest;
|
||||
|
||||
use super::*;
|
||||
use crate::engine::procedure::procedure_test_util::{self, TestEnv};
|
||||
use crate::table::test_util;
|
||||
|
||||
fn new_add_columns_req() -> AlterTableRequest {
|
||||
let new_tag = ColumnSchema::new("my_tag", ConcreteDataType::string_datatype(), true);
|
||||
let new_field = ColumnSchema::new("my_field", ConcreteDataType::string_datatype(), true);
|
||||
let alter_kind = AlterKind::AddColumns {
|
||||
columns: vec![
|
||||
AddColumnRequest {
|
||||
column_schema: new_tag,
|
||||
is_key: true,
|
||||
},
|
||||
AddColumnRequest {
|
||||
column_schema: new_field,
|
||||
is_key: false,
|
||||
},
|
||||
],
|
||||
};
|
||||
test_util::new_alter_request(alter_kind)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_procedure_add_column() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let TestEnv {
|
||||
table_engine,
|
||||
dir: _dir,
|
||||
} = procedure_test_util::setup_test_engine("create_procedure").await;
|
||||
let schema = Arc::new(test_util::schema_for_test());
|
||||
let request = test_util::new_create_request(schema.clone());
|
||||
|
||||
let engine_ctx = EngineContext::default();
|
||||
// Create table first.
|
||||
let mut procedure = table_engine
|
||||
.create_table_procedure(&engine_ctx, request.clone())
|
||||
.unwrap();
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Get metadata of the created table.
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let old_info = table.table_info();
|
||||
let old_meta = &old_info.meta;
|
||||
|
||||
// Alter the table.
|
||||
let request = new_add_columns_req();
|
||||
let mut procedure = table_engine
|
||||
.alter_table_procedure(&engine_ctx, request.clone())
|
||||
.unwrap();
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Validate.
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let new_info = table.table_info();
|
||||
let new_meta = &new_info.meta;
|
||||
let new_schema = &new_meta.schema;
|
||||
|
||||
assert_eq!(&[0, 4], &new_meta.primary_key_indices[..]);
|
||||
assert_eq!(&[1, 2, 3, 5], &new_meta.value_indices[..]);
|
||||
assert!(new_schema.column_schema_by_name("my_tag").is_some());
|
||||
assert!(new_schema.column_schema_by_name("my_field").is_some());
|
||||
assert_eq!(new_schema.version(), schema.version() + 1);
|
||||
assert_eq!(new_meta.next_column_id, old_meta.next_column_id + 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_procedure_drop_column() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let TestEnv {
|
||||
table_engine,
|
||||
dir: _dir,
|
||||
} = procedure_test_util::setup_test_engine("create_procedure").await;
|
||||
let schema = Arc::new(test_util::schema_for_test());
|
||||
let request = test_util::new_create_request(schema.clone());
|
||||
|
||||
let engine_ctx = EngineContext::default();
|
||||
// Create table first.
|
||||
let mut procedure = table_engine
|
||||
.create_table_procedure(&engine_ctx, request.clone())
|
||||
.unwrap();
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Add columns.
|
||||
let request = new_add_columns_req();
|
||||
let mut procedure = table_engine
|
||||
.alter_table_procedure(&engine_ctx, request.clone())
|
||||
.unwrap();
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Get metadata.
|
||||
let table_ref = TableReference {
|
||||
catalog: &request.catalog_name,
|
||||
schema: &request.schema_name,
|
||||
table: &request.table_name,
|
||||
};
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let old_info = table.table_info();
|
||||
let old_meta = &old_info.meta;
|
||||
|
||||
// Then remove memory and my_field from the table.
|
||||
let alter_kind = AlterKind::DropColumns {
|
||||
names: vec![String::from("memory"), String::from("my_field")],
|
||||
};
|
||||
let request = test_util::new_alter_request(alter_kind);
|
||||
let mut procedure = table_engine
|
||||
.alter_table_procedure(&engine_ctx, request.clone())
|
||||
.unwrap();
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Validate.
|
||||
let new_info = table.table_info();
|
||||
let new_meta = &new_info.meta;
|
||||
let new_schema = &new_meta.schema;
|
||||
|
||||
let remaining_names: Vec<String> = new_schema
|
||||
.column_schemas()
|
||||
.iter()
|
||||
.map(|column_schema| column_schema.name.clone())
|
||||
.collect();
|
||||
assert_eq!(&["host", "cpu", "ts", "my_tag"], &remaining_names[..]);
|
||||
assert_eq!(&[0, 3], &new_meta.primary_key_indices[..]);
|
||||
assert_eq!(&[1, 2], &new_meta.value_indices[..]);
|
||||
assert_eq!(new_schema.version(), old_meta.schema.version() + 1);
|
||||
assert_eq!(new_meta.region_numbers, old_meta.region_numbers);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_procedure_rename_table() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let TestEnv {
|
||||
table_engine,
|
||||
dir: _dir,
|
||||
} = procedure_test_util::setup_test_engine("create_procedure").await;
|
||||
let schema = Arc::new(test_util::schema_for_test());
|
||||
let create_request = test_util::new_create_request(schema.clone());
|
||||
|
||||
let engine_ctx = EngineContext::default();
|
||||
// Create table first.
|
||||
let mut procedure = table_engine
|
||||
.create_table_procedure(&engine_ctx, create_request.clone())
|
||||
.unwrap();
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Get metadata of the created table.
|
||||
let mut table_ref = TableReference {
|
||||
catalog: &create_request.catalog_name,
|
||||
schema: &create_request.schema_name,
|
||||
table: &create_request.table_name,
|
||||
};
|
||||
let table = table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
// Rename the table.
|
||||
let new_name = "another_table".to_string();
|
||||
let alter_kind = AlterKind::RenameTable {
|
||||
new_table_name: new_name.clone(),
|
||||
};
|
||||
let alter_request = test_util::new_alter_request(alter_kind);
|
||||
let mut procedure = table_engine
|
||||
.alter_table_procedure(&engine_ctx, alter_request.clone())
|
||||
.unwrap();
|
||||
procedure_test_util::execute_procedure_until_done(&mut procedure).await;
|
||||
|
||||
// Validate.
|
||||
let info = table.table_info();
|
||||
assert_eq!(new_name, info.name);
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.unwrap()
|
||||
.is_none());
|
||||
table_ref.table = &new_name;
|
||||
assert!(table_engine
|
||||
.get_table(&engine_ctx, &table_ref)
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
}
|
||||
@@ -43,7 +43,7 @@ pub(crate) struct CreateMitoTable<S: StorageEngine> {
|
||||
/// Created regions of the table.
|
||||
regions: HashMap<RegionNumber, S::Region>,
|
||||
/// Schema of the table.
|
||||
table_schema: Option<SchemaRef>,
|
||||
table_schema: SchemaRef,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -81,8 +81,14 @@ impl<S: StorageEngine> CreateMitoTable<S> {
|
||||
const TYPE_NAME: &str = "mito::CreateMitoTable";
|
||||
|
||||
/// Returns a new [CreateMitoTable].
|
||||
pub(crate) fn new(request: CreateTableRequest, engine_inner: Arc<MitoEngineInner<S>>) -> Self {
|
||||
CreateMitoTable {
|
||||
pub(crate) fn new(
|
||||
request: CreateTableRequest,
|
||||
engine_inner: Arc<MitoEngineInner<S>>,
|
||||
) -> Result<Self> {
|
||||
let table_schema =
|
||||
Schema::try_from(request.schema.clone()).context(InvalidRawSchemaSnafu)?;
|
||||
|
||||
Ok(CreateMitoTable {
|
||||
data: CreateTableData {
|
||||
state: CreateTableState::Prepare,
|
||||
request,
|
||||
@@ -90,8 +96,8 @@ impl<S: StorageEngine> CreateMitoTable<S> {
|
||||
},
|
||||
engine_inner,
|
||||
regions: HashMap::new(),
|
||||
table_schema: None,
|
||||
}
|
||||
table_schema: Arc::new(table_schema),
|
||||
})
|
||||
}
|
||||
|
||||
/// Register the loader of this procedure to the `procedure_manager`.
|
||||
@@ -115,12 +121,14 @@ impl<S: StorageEngine> CreateMitoTable<S> {
|
||||
/// Recover the procedure from json.
|
||||
fn from_json(json: &str, engine_inner: Arc<MitoEngineInner<S>>) -> Result<Self> {
|
||||
let data: CreateTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
|
||||
let table_schema =
|
||||
Schema::try_from(data.request.schema.clone()).context(InvalidRawSchemaSnafu)?;
|
||||
|
||||
Ok(CreateMitoTable {
|
||||
data,
|
||||
engine_inner,
|
||||
regions: HashMap::new(),
|
||||
table_schema: None,
|
||||
table_schema: Arc::new(table_schema),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -166,19 +174,17 @@ impl<S: StorageEngine> CreateMitoTable<S> {
|
||||
ttl,
|
||||
};
|
||||
|
||||
let table_schema =
|
||||
Schema::try_from(self.data.request.schema.clone()).context(InvalidRawSchemaSnafu)?;
|
||||
let primary_key_indices = &self.data.request.primary_key_indices;
|
||||
let (next_column_id, default_cf) = engine::build_column_family(
|
||||
engine::INIT_COLUMN_ID,
|
||||
&self.data.request.table_name,
|
||||
&table_schema,
|
||||
&self.table_schema,
|
||||
primary_key_indices,
|
||||
)?;
|
||||
let (next_column_id, row_key) = engine::build_row_key_desc(
|
||||
next_column_id,
|
||||
&self.data.request.table_name,
|
||||
&table_schema,
|
||||
&self.table_schema,
|
||||
primary_key_indices,
|
||||
)?;
|
||||
self.data.next_column_id = Some(next_column_id);
|
||||
@@ -228,7 +234,6 @@ impl<S: StorageEngine> CreateMitoTable<S> {
|
||||
|
||||
// All regions are created, moves to the next step.
|
||||
self.data.state = CreateTableState::WriteTableManifest;
|
||||
self.table_schema = Some(Arc::new(table_schema));
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
@@ -276,10 +281,9 @@ impl<S: StorageEngine> CreateMitoTable<S> {
|
||||
) -> Result<MitoTable<S::Region>> {
|
||||
// Safety: We are in `WriteTableManifest` state.
|
||||
let next_column_id = self.data.next_column_id.unwrap();
|
||||
let table_schema = self.table_schema.clone().unwrap();
|
||||
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
.schema(table_schema)
|
||||
.schema(self.table_schema.clone())
|
||||
.engine(engine::MITO_ENGINE)
|
||||
.next_column_id(next_column_id)
|
||||
.primary_key_indices(self.data.request.primary_key_indices.clone())
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::any::Any;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_error::prelude::*;
|
||||
use store_api::storage::RegionNumber;
|
||||
use table::metadata::{TableInfoBuilderError, TableMetaBuilderError};
|
||||
use table::metadata::{TableInfoBuilderError, TableMetaBuilderError, TableVersion};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
@@ -187,6 +187,12 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Invalid schema, source: {}", source))]
|
||||
InvalidRawSchema { source: datatypes::error::Error },
|
||||
|
||||
#[snafu(display("Table version changed, expect: {}, actual: {}", expect, actual))]
|
||||
VersionChanged {
|
||||
expect: TableVersion,
|
||||
actual: TableVersion,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -211,7 +217,8 @@ impl ErrorExt for Error {
|
||||
| InvalidPrimaryKey { .. }
|
||||
| MissingTimestampIndex { .. }
|
||||
| TableNotFound { .. }
|
||||
| InvalidRawSchema { .. } => StatusCode::InvalidArguments,
|
||||
| InvalidRawSchema { .. }
|
||||
| VersionChanged { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
TableInfoNotFound { .. } | ConvertRaw { .. } => StatusCode::Unexpected,
|
||||
|
||||
|
||||
@@ -558,7 +558,7 @@ impl<R: Region> MitoTable<R> {
|
||||
}
|
||||
|
||||
/// Create [`AlterOperation`] according to given `alter_kind`.
|
||||
fn create_alter_operation(
|
||||
pub(crate) fn create_alter_operation(
|
||||
table_name: &str,
|
||||
alter_kind: &AlterKind,
|
||||
table_meta: &mut TableMeta,
|
||||
|
||||
@@ -29,7 +29,9 @@ use storage::config::EngineConfig as StorageEngineConfig;
|
||||
use storage::EngineImpl;
|
||||
use table::engine::{EngineContext, TableEngine};
|
||||
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType};
|
||||
use table::requests::{CreateTableRequest, InsertRequest, TableOptions};
|
||||
use table::requests::{
|
||||
AlterKind, AlterTableRequest, CreateTableRequest, InsertRequest, TableOptions,
|
||||
};
|
||||
use table::{Table, TableRef};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
@@ -118,6 +120,15 @@ pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_alter_request(alter_kind: AlterKind) -> AlterTableRequest {
|
||||
AlterTableRequest {
|
||||
catalog_name: "greptime".to_string(),
|
||||
schema_name: "public".to_string(),
|
||||
table_name: TABLE_NAME.to_string(),
|
||||
alter_kind,
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestEngineComponents {
|
||||
pub table_engine: MitoEngine<EngineImpl<NoopLogStore>>,
|
||||
pub storage_engine: EngineImpl<NoopLogStore>,
|
||||
|
||||
@@ -121,6 +121,13 @@ pub trait TableEngineProcedure: Send + Sync {
|
||||
ctx: &EngineContext,
|
||||
request: CreateTableRequest,
|
||||
) -> Result<BoxedProcedure>;
|
||||
|
||||
/// Returns a procedure that alters table by specific `request`.
|
||||
fn alter_table_procedure(
|
||||
&self,
|
||||
ctx: &EngineContext,
|
||||
request: AlterTableRequest,
|
||||
) -> Result<BoxedProcedure>;
|
||||
}
|
||||
|
||||
pub type TableEngineProcedureRef = Arc<dyn TableEngineProcedure>;
|
||||
|
||||
@@ -136,7 +136,7 @@ pub struct OpenTableRequest {
|
||||
}
|
||||
|
||||
/// Alter table request
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AlterTableRequest {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
@@ -151,13 +151,13 @@ impl AlterTableRequest {
|
||||
}
|
||||
|
||||
/// Add column request
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AddColumnRequest {
|
||||
pub column_schema: ColumnSchema,
|
||||
pub is_key: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum AlterKind {
|
||||
AddColumns { columns: Vec<AddColumnRequest> },
|
||||
DropColumns { names: Vec<String> },
|
||||
|
||||
Reference in New Issue
Block a user