feat(mito): Combine the original and procedure's implementation (#1468)

* fix(mito): Add metrics to mito DDL procedure

* feat(mito): Use procedure's implementation to create table

* feat(mito): Use procedure's implementation to alter table

* feat(mito): Use procedure's implementation to drop table

* style(mito): Fix clippy

* test(mito): Fix tests

* feat(mito): Add TableCreator

* feat(mito): update alter table procedure

* fix(mito): alter procedure create alter op first

* feat(mito): Combine alter table code

* fix(mito): Fix deadlock

* feat(mito): Simplify drop table procedure
This commit is contained in:
Yingwen
2023-04-28 11:48:52 +08:00
committed by GitHub
parent 9e4887f29f
commit 51be35a7b1
8 changed files with 361 additions and 462 deletions

View File

@@ -234,6 +234,9 @@ impl EngineInner {
return if request.create_if_not_exists {
Ok(table)
} else {
// If the procedure retry this method. It is possible to return error
// when the table is already created.
// TODO(yingwen): Refactor this like the mito engine.
TableExistsSnafu { table_name }.fail()
};
}

View File

@@ -21,10 +21,8 @@ use std::sync::Arc;
use async_trait::async_trait;
pub use common_catalog::consts::MITO_ENGINE;
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_procedure::{BoxedProcedure, ProcedureManager};
use common_telemetry::tracing::log::info;
use common_telemetry::{debug, logging};
use dashmap::DashMap;
use datatypes::schema::Schema;
@@ -33,26 +31,24 @@ use object_store::ObjectStore;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{
ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
CreateOptions, EngineContext as StorageEngineContext, OpenOptions, Region,
RegionDescriptorBuilder, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine,
EngineContext as StorageEngineContext, OpenOptions, RowKeyDescriptor, RowKeyDescriptorBuilder,
StorageEngine,
};
use table::engine::{
region_id, region_name, table_dir, EngineContext, TableEngine, TableEngineProcedure,
TableReference,
region_name, table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference,
};
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion};
use table::metadata::{TableInfo, TableVersion};
use table::requests::{
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
};
use table::table::{AlterContext, TableRef};
use table::{error as table_error, Result as TableResult, Table};
use table::{error as table_error, Result as TableResult, Table, TableRef};
use crate::config::EngineConfig;
use crate::engine::procedure::{AlterMitoTable, CreateMitoTable, DropMitoTable};
use crate::engine::procedure::{AlterMitoTable, CreateMitoTable, DropMitoTable, TableCreator};
use crate::error::{
self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu,
BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, InvalidRawSchemaSnafu,
MissingTimestampIndexSnafu, RegionNotFoundSnafu, Result, TableExistsSnafu,
BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRowKeyDescriptorSnafu,
InvalidPrimaryKeySnafu, MissingTimestampIndexSnafu, RegionNotFoundSnafu, Result,
TableExistsSnafu,
};
use crate::manifest::TableManifest;
use crate::metrics;
@@ -93,12 +89,36 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
async fn create_table(
&self,
ctx: &EngineContext,
_ctx: &EngineContext,
request: CreateTableRequest,
) -> TableResult<TableRef> {
let _timer = common_telemetry::timer!(metrics::MITO_CREATE_TABLE_ELAPSED);
self.inner
.create_table(ctx, request)
validate_create_table_request(&request)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let table_ref = request.table_ref();
let _lock = self.inner.table_mutex.lock(table_ref.to_string()).await;
if let Some(table) = self.inner.get_mito_table(&table_ref) {
if request.create_if_not_exists {
return Ok(table);
} else {
return TableExistsSnafu {
table_name: request.table_name,
}
.fail()
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
}
}
let mut creator = TableCreator::new(request, self.inner.clone())
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
creator
.create_table()
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
@@ -119,12 +139,33 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
async fn alter_table(
&self,
ctx: &EngineContext,
_ctx: &EngineContext,
req: AlterTableRequest,
) -> TableResult<TableRef> {
let _timer = common_telemetry::timer!(metrics::MITO_ALTER_TABLE_ELAPSED);
self.inner
.alter_table(ctx, req)
if let AlterKind::RenameTable { new_table_name } = &req.alter_kind {
let mut table_ref = req.table_ref();
table_ref.table = new_table_name;
if self.inner.get_mito_table(&table_ref).is_some() {
return TableExistsSnafu {
table_name: table_ref.to_string(),
}
.fail()
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
}
}
let mut procedure = AlterMitoTable::new(req, self.inner.clone())
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
// TODO(yingwen): Rename has concurrent issue without the procedure runtime. But
// users can't use this method to alter a table so it is still safe. We should
// refactor the table engine to avoid using table name as key.
procedure
.engine_alter_table()
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
@@ -147,11 +188,7 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
_ctx: &EngineContext,
request: DropTableRequest,
) -> TableResult<bool> {
self.inner
.drop_table(request)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
self.inner.drop_table(request).await
}
async fn close(&self) -> TableResult<()> {
@@ -345,148 +382,6 @@ fn validate_create_table_request(request: &CreateTableRequest) -> Result<()> {
}
impl<S: StorageEngine> MitoEngineInner<S> {
async fn create_table(
&self,
_ctx: &EngineContext,
request: CreateTableRequest,
) -> Result<TableRef> {
let catalog_name = &request.catalog_name;
let schema_name = &request.schema_name;
let table_name = &request.table_name;
let table_ref = TableReference {
catalog: catalog_name,
schema: schema_name,
table: table_name,
};
validate_create_table_request(&request)?;
if let Some(table) = self.get_table(&table_ref) {
if request.create_if_not_exists {
return Ok(table);
} else {
return TableExistsSnafu {
table_name: format_full_table_name(catalog_name, schema_name, table_name),
}
.fail();
}
}
let table_schema =
Arc::new(Schema::try_from(request.schema).context(InvalidRawSchemaSnafu)?);
let primary_key_indices = &request.primary_key_indices;
let (next_column_id, default_cf) = build_column_family(
INIT_COLUMN_ID,
table_name,
&table_schema,
primary_key_indices,
)?;
let (next_column_id, row_key) = build_row_key_desc(
next_column_id,
table_name,
&table_schema,
primary_key_indices,
)?;
let table_id = request.id;
let table_dir = table_dir(catalog_name, schema_name, table_id);
let mut regions = HashMap::with_capacity(request.region_numbers.len());
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
// Checks again, read lock should be enough since we are guarded by the mutex.
if let Some(table) = self.get_table(&table_ref) {
return if request.create_if_not_exists {
Ok(table)
} else {
TableExistsSnafu { table_name }.fail()
};
}
for region_number in &request.region_numbers {
let region_id = region_id(table_id, *region_number);
let region_name = region_name(table_id, *region_number);
let region_descriptor = RegionDescriptorBuilder::default()
.id(region_id)
.name(&region_name)
.row_key(row_key.clone())
.compaction_time_window(request.table_options.compaction_time_window)
.default_cf(default_cf.clone())
.build()
.context(BuildRegionDescriptorSnafu {
table_name,
region_name,
})?;
let opts = CreateOptions {
parent_dir: table_dir.clone(),
write_buffer_size: request
.table_options
.write_buffer_size
.map(|size| size.0 as usize),
ttl: request.table_options.ttl,
compaction_time_window: request.table_options.compaction_time_window,
};
let region = {
let _timer = common_telemetry::timer!(crate::metrics::MITO_CREATE_REGION_ELAPSED);
self.storage_engine
.create_region(&StorageEngineContext::default(), region_descriptor, &opts)
.await
.map_err(BoxedError::new)
.context(error::CreateRegionSnafu)?
};
info!(
"Mito engine created region: {}, id: {}",
region.name(),
region.id()
);
regions.insert(*region_number, region);
}
let table_meta = TableMetaBuilder::default()
.schema(table_schema)
.engine(MITO_ENGINE)
.next_column_id(next_column_id)
.primary_key_indices(request.primary_key_indices.clone())
.options(request.table_options)
.region_numbers(request.region_numbers)
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
let table_info = TableInfoBuilder::new(table_name.clone(), table_meta)
.ident(table_id)
.table_version(INIT_TABLE_VERSION)
.table_type(TableType::Base)
.catalog_name(catalog_name.to_string())
.schema_name(schema_name.to_string())
.desc(request.desc)
.build()
.context(error::BuildTableInfoSnafu { table_name })?;
let table = Arc::new(
MitoTable::create(
table_name,
&table_dir,
table_info,
regions,
self.object_store.clone(),
)
.await?,
);
logging::info!(
"Mito engine created table: {} in schema: {}, table_id: {}.",
table_name,
schema_name,
table_id
);
// already locked
self.tables.insert(table_ref.to_string(), table.clone());
Ok(table)
}
async fn open_table(
&self,
_ctx: &EngineContext,
@@ -578,6 +473,27 @@ impl<S: StorageEngine> MitoEngineInner<S> {
Ok(table)
}
async fn drop_table(&self, request: DropTableRequest) -> TableResult<bool> {
// Remove the table from the engine to avoid further access from users.
let table_ref = request.table_ref();
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
let removed_table = self.tables.remove(&table_ref.to_string());
// Close the table to close all regions. Closing a region is idempotent.
if let Some((_, table)) = &removed_table {
table
.close()
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(true)
} else {
Ok(false)
}
}
async fn recover_table_manifest_and_info(
&self,
table_name: &str,
@@ -607,66 +523,6 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.map(|en| en.value().clone())
}
async fn alter_table(&self, _ctx: &EngineContext, req: AlterTableRequest) -> Result<TableRef> {
let catalog_name = &req.catalog_name;
let schema_name = &req.schema_name;
let table_name = &req.table_name;
if let AlterKind::RenameTable { new_table_name } = &req.alter_kind {
let table_ref = TableReference {
catalog: catalog_name,
schema: schema_name,
table: new_table_name,
};
if self.get_table(&table_ref).is_some() {
return TableExistsSnafu {
table_name: table_ref.to_string(),
}
.fail();
}
}
let mut table_ref = TableReference {
catalog: catalog_name,
schema: schema_name,
table: table_name,
};
let table = self
.get_mito_table(&table_ref)
.context(error::TableNotFoundSnafu { table_name })?;
logging::info!("start altering table {} with request {:?}", table_name, req);
table
.alter(AlterContext::new(), &req)
.await
.context(error::AlterTableSnafu { table_name })?;
if let AlterKind::RenameTable { new_table_name } = &req.alter_kind {
let removed = {
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
self.tables.remove(&table_ref.to_string())
};
ensure!(removed.is_some(), error::TableNotFoundSnafu { table_name });
table_ref.table = new_table_name.as_str();
let _lock = self.table_mutex.lock(table_ref.to_string()).await;
self.tables.insert(table_ref.to_string(), table.clone());
}
Ok(table)
}
/// Drop table. Returns whether a table is dropped (true) or not exist (false).
async fn drop_table(&self, req: DropTableRequest) -> Result<bool> {
let table_reference = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
table: &req.table_name,
};
// todo(ruihang): reclaim persisted data
let _lock = self.table_mutex.lock(table_reference.to_string()).await;
Ok(self.tables.remove(&table_reference.to_string()).is_some())
}
async fn close(&self) -> TableResult<()> {
futures::future::try_join_all(
self.tables

View File

@@ -20,7 +20,7 @@ use std::sync::Arc;
pub(crate) use alter::AlterMitoTable;
use common_procedure::ProcedureManager;
pub(crate) use create::CreateMitoTable;
pub(crate) use create::{CreateMitoTable, TableCreator};
pub(crate) use drop::DropMitoTable;
use store_api::storage::StorageEngine;

View File

@@ -18,21 +18,23 @@ use async_trait::async_trait;
use common_procedure::error::{Error, FromJsonSnafu, ToJsonSnafu};
use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, Status};
use common_telemetry::logging;
use common_telemetry::metric::Timer;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::Manifest;
use store_api::storage::{AlterRequest, Region, RegionMeta, StorageEngine};
use store_api::storage::{AlterOperation, StorageEngine};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableInfo, TableVersion};
use table::requests::{AlterKind, AlterTableRequest};
use table::Table;
use table::{Table, TableRef};
use crate::engine::MitoEngineInner;
use crate::error::{
BuildTableMetaSnafu, TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu,
TableExistsSnafu, TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu,
};
use crate::manifest::action::{TableChange, TableMetaAction, TableMetaActionList};
use crate::table::{create_alter_operation, MitoTable};
use crate::metrics;
use crate::table::MitoTable;
/// Procedure to alter a [MitoTable].
pub(crate) struct AlterMitoTable<S: StorageEngine> {
@@ -41,6 +43,9 @@ pub(crate) struct AlterMitoTable<S: StorageEngine> {
table: Arc<MitoTable<S::Region>>,
/// The table info after alteration.
new_info: Option<TableInfo>,
/// The region alter operation.
alter_op: Option<AlterOperation>,
_timer: Timer,
}
#[async_trait]
@@ -52,8 +57,10 @@ impl<S: StorageEngine> Procedure for AlterMitoTable<S> {
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
match self.data.state {
AlterTableState::Prepare => self.on_prepare(),
AlterTableState::AlterRegions => self.on_alter_regions().await,
AlterTableState::UpdateTableManifest => self.on_update_table_manifest().await,
AlterTableState::EngineAlterTable => {
self.engine_alter_table().await?;
Ok(Status::Done)
}
}
}
@@ -114,6 +121,8 @@ impl<S: StorageEngine> AlterMitoTable<S> {
engine_inner,
table,
new_info: None,
alter_op: None,
_timer: common_telemetry::timer!(metrics::MITO_ALTER_TABLE_ELAPSED),
})
}
@@ -151,6 +160,8 @@ impl<S: StorageEngine> AlterMitoTable<S> {
engine_inner,
table,
new_info: None,
alter_op: None,
_timer: common_telemetry::timer!(metrics::MITO_ALTER_TABLE_ELAPSED),
})
}
@@ -165,81 +176,58 @@ impl<S: StorageEngine> AlterMitoTable<S> {
}
);
self.init_new_info(&current_info)?;
self.data.state = AlterTableState::AlterRegions;
if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind {
let mut table_ref = self.data.table_ref();
table_ref.table = new_table_name;
ensure!(
self.engine_inner.get_mito_table(&table_ref).is_none(),
TableExistsSnafu {
table_name: table_ref.to_string(),
}
);
}
self.data.state = AlterTableState::EngineAlterTable;
Ok(Status::executing(true))
}
/// Engine alters the table.
///
/// Note that calling this method directly (without submitting the procedure
/// to the manager) to rename a table might have concurrent issue when
/// we renaming two tables to the same table name.
pub(crate) async fn engine_alter_table(&mut self) -> Result<TableRef> {
let current_info = self.table.table_info();
if current_info.ident.version > self.data.table_version {
// The table is already altered.
return Ok(self.table.clone());
}
self.init_new_info_and_op(&current_info)?;
self.alter_regions().await?;
self.update_table_manifest().await
}
/// Alter regions.
async fn on_alter_regions(&mut self) -> Result<Status> {
let current_info = self.table.table_info();
ensure!(
current_info.ident.version == self.data.table_version,
VersionChangedSnafu {
expect: self.data.table_version,
actual: current_info.ident.version,
}
);
self.init_new_info(&current_info)?;
let new_info = self.new_info.as_mut().unwrap();
let table_name = &self.data.request.table_name;
let Some(alter_op) = create_alter_operation(table_name, &self.data.request.alter_kind, &mut new_info.meta)
.map_err(Error::from_error_ext)? else {
async fn alter_regions(&mut self) -> Result<()> {
let Some(alter_op) = &self.alter_op else {
// Don't need to alter the region.
self.data.state = AlterTableState::UpdateTableManifest;
return Ok(Status::executing(true));
return Ok(());
};
let regions = self.table.regions();
// For each region, alter it if its version is not updated.
for region in regions.values() {
let region_meta = region.in_memory_metadata();
if u64::from(region_meta.version()) > self.data.table_version {
// Region is already altered.
continue;
}
let alter_req = AlterRequest {
operation: alter_op.clone(),
version: region_meta.version(),
};
// Alter the region.
logging::debug!(
"start altering region {} of table {}, with request {:?}",
region.name(),
table_name,
alter_req,
);
region
.alter(alter_req)
.await
.map_err(Error::from_error_ext)?;
}
self.data.state = AlterTableState::UpdateTableManifest;
Ok(Status::executing(true))
let table_name = &self.data.request.table_name;
let table_version = self.data.table_version;
self.table
.alter_regions(table_name, table_version, alter_op)
.await
.map_err(Error::from_error_ext)
}
/// Persist the alteration to the manifest and update table info.
async fn on_update_table_manifest(&mut self) -> Result<Status> {
// Get current table info.
let current_info = self.table.table_info();
if current_info.ident.version > self.data.table_version {
logging::info!(
"table {} version is already updated, current: {}, old_version: {}",
self.data.request.table_name,
current_info.ident.version,
self.data.table_version,
);
return Ok(Status::Done);
}
self.init_new_info(&current_info)?;
async fn update_table_manifest(&mut self) -> Result<TableRef> {
// Safety: We init new info in engine_alter_table()
let new_info = self.new_info.as_ref().unwrap();
let table_name = &self.data.request.table_name;
@@ -249,6 +237,8 @@ impl<S: StorageEngine> AlterMitoTable<S> {
new_info
);
// It is possible that we write the manifest multiple times and bump the manifest
// version, but it is still correct as we always write the new table info.
self.table
.manifest()
.update(TableMetaActionList::with_action(TableMetaAction::Change(
@@ -278,35 +268,21 @@ impl<S: StorageEngine> AlterMitoTable<S> {
.insert(table_ref.to_string(), self.table.clone());
}
Ok(Status::Done)
Ok(self.table.clone())
}
fn init_new_info(&mut self, current_info: &TableInfo) -> Result<()> {
fn init_new_info_and_op(&mut self, current_info: &TableInfo) -> Result<()> {
if self.new_info.is_some() {
return Ok(());
}
let table_name = &current_info.name;
let mut new_info = TableInfo::clone(current_info);
// setup new table info
match &self.data.request.alter_kind {
AlterKind::RenameTable { new_table_name } => {
new_info.name = new_table_name.clone();
}
AlterKind::AddColumns { .. } | AlterKind::DropColumns { .. } => {
let table_meta = &current_info.meta;
let new_meta = table_meta
.builder_with_alter_kind(table_name, &self.data.request.alter_kind)
.map_err(Error::from_error_ext)?
.build()
.context(BuildTableMetaSnafu { table_name })?;
new_info.meta = new_meta;
}
}
// Increase version of the table.
new_info.ident.version = current_info.ident.version + 1;
let (new_info, alter_op) = self
.table
.info_and_op_for_alter(current_info, &self.data.request.alter_kind)
.map_err(Error::from_error_ext)?;
self.new_info = Some(new_info);
self.alter_op = alter_op;
Ok(())
}
@@ -315,12 +291,10 @@ impl<S: StorageEngine> AlterMitoTable<S> {
/// Represents each step while altering table in the mito engine.
#[derive(Debug, Serialize, Deserialize)]
enum AlterTableState {
/// Prepare to alter table.
/// Prepare to alter the table.
Prepare,
/// Alter regions.
AlterRegions,
/// Update table manifest.
UpdateTableManifest,
/// Engine alters the table.
EngineAlterTable,
}
/// Serializable data of [AlterMitoTable].

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{Context, Error, LockKey, Procedure, ProcedureManager, Result, Status};
use common_telemetry::metric::Timer;
use datatypes::schema::{Schema, SchemaRef};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -28,22 +29,20 @@ use store_api::storage::{
use table::engine::{region_id, table_dir};
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
use table::requests::CreateTableRequest;
use table::TableRef;
use crate::engine::{self, MitoEngineInner, TableReference};
use crate::error::{
BuildRegionDescriptorSnafu, BuildTableInfoSnafu, BuildTableMetaSnafu, InvalidRawSchemaSnafu,
TableExistsSnafu,
};
use crate::metrics;
use crate::table::MitoTable;
/// Procedure to create a [MitoTable].
pub(crate) struct CreateMitoTable<S: StorageEngine> {
data: CreateTableData,
engine_inner: Arc<MitoEngineInner<S>>,
/// Created regions of the table.
regions: HashMap<RegionNumber, S::Region>,
/// Schema of the table.
table_schema: SchemaRef,
creator: TableCreator<S>,
_timer: Timer,
}
#[async_trait]
@@ -53,21 +52,21 @@ impl<S: StorageEngine> Procedure for CreateMitoTable<S> {
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
match self.data.state {
match self.creator.data.state {
CreateTableState::Prepare => self.on_prepare(),
CreateTableState::CreateRegions => self.on_create_regions().await,
CreateTableState::WriteTableManifest => self.on_write_table_manifest().await,
CreateTableState::EngineCreateTable => self.on_engine_create_table().await,
}
}
fn dump(&self) -> Result<String> {
let json = serde_json::to_string(&self.data).context(ToJsonSnafu)?;
let json = serde_json::to_string(&self.creator.data).context(ToJsonSnafu)?;
Ok(json)
}
fn lock_key(&self) -> LockKey {
let table_ref = self.data.table_ref();
let table_ref = self.creator.data.table_ref();
let keys = self
.creator
.data
.request
.region_numbers
@@ -85,18 +84,9 @@ impl<S: StorageEngine> CreateMitoTable<S> {
request: CreateTableRequest,
engine_inner: Arc<MitoEngineInner<S>>,
) -> Result<Self> {
let table_schema =
Schema::try_from(request.schema.clone()).context(InvalidRawSchemaSnafu)?;
Ok(CreateMitoTable {
data: CreateTableData {
state: CreateTableState::Prepare,
request,
next_column_id: None,
},
engine_inner,
regions: HashMap::new(),
table_schema: Arc::new(table_schema),
creator: TableCreator::new(request, engine_inner)?,
_timer: common_telemetry::timer!(metrics::MITO_CREATE_TABLE_ELAPSED),
})
}
@@ -125,20 +115,23 @@ impl<S: StorageEngine> CreateMitoTable<S> {
Schema::try_from(data.request.schema.clone()).context(InvalidRawSchemaSnafu)?;
Ok(CreateMitoTable {
data,
engine_inner,
regions: HashMap::new(),
table_schema: Arc::new(table_schema),
creator: TableCreator {
data,
engine_inner,
regions: HashMap::new(),
table_schema: Arc::new(table_schema),
},
_timer: common_telemetry::timer!(metrics::MITO_CREATE_TABLE_ELAPSED),
})
}
/// Checks whether the table exists.
fn on_prepare(&mut self) -> Result<Status> {
let table_ref = self.data.table_ref();
if self.engine_inner.get_table(&table_ref).is_some() {
let table_ref = self.creator.data.table_ref();
if self.creator.engine_inner.get_table(&table_ref).is_some() {
// If the table already exists.
ensure!(
self.data.request.create_if_not_exists,
self.creator.data.request.create_if_not_exists,
TableExistsSnafu {
table_name: table_ref.to_string(),
}
@@ -147,31 +140,97 @@ impl<S: StorageEngine> CreateMitoTable<S> {
return Ok(Status::Done);
}
self.data.state = CreateTableState::CreateRegions;
self.creator.data.state = CreateTableState::EngineCreateTable;
Ok(Status::executing(true))
}
/// Creates regions for the table.
async fn on_create_regions(&mut self) -> Result<Status> {
let engine_ctx = EngineContext::default();
/// Creates the table.
async fn on_engine_create_table(&mut self) -> Result<Status> {
// In this state, we can ensure we are able to create a new table.
let table_ref = self.creator.data.table_ref();
let _lock = self
.creator
.engine_inner
.table_mutex
.lock(table_ref.to_string())
.await;
self.creator.create_table().await?;
Ok(Status::Done)
}
}
/// Mito table creator.
pub(crate) struct TableCreator<S: StorageEngine> {
data: CreateTableData,
engine_inner: Arc<MitoEngineInner<S>>,
/// Created regions of the table.
regions: HashMap<RegionNumber, S::Region>,
/// Schema of the table.
table_schema: SchemaRef,
}
impl<S: StorageEngine> TableCreator<S> {
/// Returns a new [TableCreator].
pub(crate) fn new(
request: CreateTableRequest,
engine_inner: Arc<MitoEngineInner<S>>,
) -> Result<Self> {
let table_schema =
Schema::try_from(request.schema.clone()).context(InvalidRawSchemaSnafu)?;
Ok(TableCreator {
data: CreateTableData {
state: CreateTableState::Prepare,
request,
next_column_id: None,
},
engine_inner,
regions: HashMap::new(),
table_schema: Arc::new(table_schema),
})
}
/// Creates a new mito table or returns the table if it already exists.
///
/// # Note
/// - Callers MUST acquire the table lock first.
/// - The procedure may call this method multiple times.
pub(crate) async fn create_table(&mut self) -> Result<TableRef> {
let table_dir = table_dir(
&self.data.request.catalog_name,
&self.data.request.schema_name,
self.data.request.id,
);
let table_ref = self.data.table_ref();
// It is possible that the procedure retries `CREATE TABLE` many times, so we
// return the table if it exists.
if let Some(table) = self.engine_inner.get_table(&table_ref) {
return Ok(table.clone());
}
self.create_regions(&table_dir).await?;
self.write_table_manifest(&table_dir).await
}
/// Creates regions for the table.
async fn create_regions(&mut self, table_dir: &str) -> Result<()> {
let table_options = &self.data.request.table_options;
let write_buffer_size = table_options.write_buffer_size.map(|size| size.0 as usize);
let ttl = table_options.ttl;
let compaction_time_window = table_options.compaction_time_window;
let open_opts = OpenOptions {
parent_dir: table_dir.clone(),
parent_dir: table_dir.to_string(),
write_buffer_size,
ttl,
compaction_time_window,
};
let create_opts = CreateOptions {
parent_dir: table_dir,
parent_dir: table_dir.to_string(),
write_buffer_size,
ttl,
compaction_time_window,
@@ -193,6 +252,7 @@ impl<S: StorageEngine> CreateMitoTable<S> {
self.data.next_column_id = Some(next_column_id);
// Try to open all regions and collect the regions not exist.
let engine_ctx = EngineContext::default();
for number in &self.data.request.region_numbers {
if self.regions.contains_key(number) {
// Region is opened.
@@ -226,55 +286,47 @@ impl<S: StorageEngine> CreateMitoTable<S> {
region_name,
})?;
let region = self
.engine_inner
.storage_engine
.create_region(&engine_ctx, region_desc, &create_opts)
.await
.map_err(Error::from_error_ext)?;
let region = {
let _timer = common_telemetry::timer!(crate::metrics::MITO_CREATE_REGION_ELAPSED);
self.engine_inner
.storage_engine
.create_region(&engine_ctx, region_desc, &create_opts)
.await
.map_err(Error::from_error_ext)?
};
self.regions.insert(*number, region);
}
// All regions are created, moves to the next step.
self.data.state = CreateTableState::WriteTableManifest;
Ok(Status::executing(true))
Ok(())
}
/// Writes metadata to the table manifest.
async fn on_write_table_manifest(&mut self) -> Result<Status> {
let table_dir = table_dir(
&self.data.request.catalog_name,
&self.data.request.schema_name,
self.data.request.id,
);
async fn write_table_manifest(&mut self, table_dir: &str) -> Result<TableRef> {
// Try to open the table first, as the table manifest might already exist.
let table_ref = self.data.table_ref();
if let Some((manifest, table_info)) = self
.engine_inner
.recover_table_manifest_and_info(&self.data.request.table_name, &table_dir)
.recover_table_manifest_and_info(&self.data.request.table_name, table_dir)
.await?
{
let table = Arc::new(MitoTable::new(table_info, self.regions.clone(), manifest));
let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string());
self.engine_inner
.tables
.insert(table_ref.to_string(), table);
return Ok(Status::Done);
.insert(table_ref.to_string(), table.clone());
return Ok(table);
}
// We need to persist the table manifest and create the table.
let table = self.write_manifest_and_create_table(&table_dir).await?;
let table = self.write_manifest_and_create_table(table_dir).await?;
let table = Arc::new(table);
let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string());
self.engine_inner
.tables
.insert(table_ref.to_string(), table);
.insert(table_ref.to_string(), table.clone());
Ok(Status::Done)
Ok(table)
}
/// Write metadata to the table manifest and return the created table.
@@ -282,7 +334,7 @@ impl<S: StorageEngine> CreateMitoTable<S> {
&self,
table_dir: &str,
) -> Result<MitoTable<S::Region>> {
// Safety: We are in `WriteTableManifest` state.
// Safety: next_column_id is always Some when calling this method.
let next_column_id = self.data.next_column_id.unwrap();
let table_meta = TableMetaBuilder::default()
@@ -325,12 +377,10 @@ impl<S: StorageEngine> CreateMitoTable<S> {
/// Represents each step while creating table in the mito engine.
#[derive(Debug, Serialize, Deserialize)]
enum CreateTableState {
/// Prepare to create region.
/// Prepare to create the table.
Prepare,
/// Create regions.
CreateRegions,
/// Write metadata to table manifest.
WriteTableManifest,
/// Engine creates the table.
EngineCreateTable,
}
/// Serializable data of [CreateMitoTable].
@@ -340,7 +390,7 @@ struct CreateTableData {
request: CreateTableRequest,
/// Next id for column.
///
/// Available in [CreateTableState::WriteTableManifest] state.
/// Set by [TableCreator::create_regions].
next_column_id: Option<ColumnId>,
}

View File

@@ -43,7 +43,14 @@ impl<S: StorageEngine> Procedure for DropMitoTable<S> {
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
match self.data.state {
DropTableState::Prepare => self.on_prepare(),
DropTableState::CloseRegions => self.on_close_regions().await,
DropTableState::EngineDropTable => {
self.engine_inner
.drop_table(self.data.request.clone())
.await
.map_err(Error::from_error_ext)?;
Ok(Status::Done)
}
}
}
@@ -120,42 +127,19 @@ impl<S: StorageEngine> DropMitoTable<S> {
/// Prepare table info.
fn on_prepare(&mut self) -> Result<Status> {
self.data.state = DropTableState::CloseRegions;
self.data.state = DropTableState::EngineDropTable;
Ok(Status::executing(true))
}
/// Close all regions.
async fn on_close_regions(&mut self) -> Result<Status> {
// Remove the table from the engine to avoid further access from users.
let table_ref = self.data.table_ref();
let _lock = self
.engine_inner
.table_mutex
.lock(table_ref.to_string())
.await;
self.engine_inner.tables.remove(&table_ref.to_string());
// Close the table to close all regions. Closing a region is idempotent.
if let Some(table) = &self.table {
table.close().await.map_err(Error::from_error_ext)?;
}
// TODO(yingwen): Currently, DROP TABLE doesn't remove data. We can
// write a drop meta update to the table and remove all files in the
// background.
Ok(Status::Done)
}
}
/// Represents each step while dropping table in the mito engine.
#[derive(Debug, Serialize, Deserialize)]
enum DropTableState {
/// Prepare to drop table.
/// Prepare to drop the table.
Prepare,
/// Close regions of this table.
CloseRegions,
/// Engine drop the table.
EngineDropTable,
}
/// Serializable data of [DropMitoTable].

View File

@@ -31,9 +31,12 @@ use storage::region::RegionImpl;
use storage::EngineImpl;
use store_api::manifest::Manifest;
use store_api::storage::ReadContext;
use table::engine::region_id;
use table::metadata::TableType;
use table::requests::{
AddColumnRequest, AlterKind, DeleteRequest, FlushTableRequest, TableOptions,
};
use table::Table;
use super::*;
use crate::table::test_util::{

View File

@@ -43,7 +43,7 @@ use table::error::{
InvalidTableSnafu, RegionSchemaMismatchSnafu, Result as TableResult, TableOperationSnafu,
};
use table::metadata::{
FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType,
FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType, TableVersion,
};
use table::requests::{
AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest,
@@ -222,29 +222,14 @@ impl<R: Region> Table for MitoTable<R> {
let _lock = self.alter_lock.lock().await;
let table_info = self.table_info();
let table_version = table_info.ident.version;
let (new_info, alter_op) = self.info_and_op_for_alter(&table_info, &req.alter_kind)?;
let table_name = &table_info.name;
let mut new_info = TableInfo::clone(&*table_info);
// setup new table info
match &req.alter_kind {
AlterKind::RenameTable { new_table_name } => {
new_info.name = new_table_name.clone();
}
AlterKind::AddColumns { .. } | AlterKind::DropColumns { .. } => {
let table_meta = &table_info.meta;
let new_meta = table_meta
.builder_with_alter_kind(table_name, &req.alter_kind)?
.build()
.context(error::BuildTableMetaSnafu { table_name })
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
new_info.meta = new_meta;
}
if let Some(alter_op) = &alter_op {
self.alter_regions(table_name, table_version, alter_op)
.await?;
}
// Do create_alter_operation first to bump next_column_id in meta.
let alter_op = create_alter_operation(table_name, &req.alter_kind, &mut new_info.meta)?;
// Increase version of the table.
new_info.ident.version = table_info.ident.version + 1;
// Persist the alteration to the manifest.
logging::debug!(
@@ -265,30 +250,6 @@ impl<R: Region> Table for MitoTable<R> {
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
if let Some(alter_op) = alter_op {
// TODO(yingwen): Error handling. Maybe the region need to provide a method to
// validate the request first.
let regions = self.regions();
for region in regions.values() {
let region_meta = region.in_memory_metadata();
let alter_req = AlterRequest {
operation: alter_op.clone(),
version: region_meta.version(),
};
// Alter the region.
logging::debug!(
"start altering region {} of table {}, with request {:?}",
region.name(),
table_name,
alter_req,
);
region
.alter(alter_req)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
}
}
// Update in memory metadata of the table.
self.set_table_info(new_info);
@@ -562,6 +523,74 @@ impl<R: Region> MitoTable<R> {
// TODO(dennis): use manifest version in catalog ?
(manifest::MIN_VERSION, manifest::MAX_VERSION)
}
/// For each region, alter it if its version is not updated.
pub(crate) async fn alter_regions(
&self,
table_name: &str,
table_version: TableVersion,
alter_op: &AlterOperation,
) -> TableResult<()> {
let regions = self.regions();
for region in regions.values() {
let region_meta = region.in_memory_metadata();
if u64::from(region_meta.version()) > table_version {
// Region is already altered.
continue;
}
let alter_req = AlterRequest {
operation: alter_op.clone(),
version: region_meta.version(),
};
// Alter the region.
logging::debug!(
"start altering region {} of table {}, with request {:?}",
region.name(),
table_name,
alter_req,
);
region
.alter(alter_req)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
}
Ok(())
}
pub(crate) fn info_and_op_for_alter(
&self,
current_info: &TableInfo,
alter_kind: &AlterKind,
) -> TableResult<(TableInfo, Option<AlterOperation>)> {
let table_name = &current_info.name;
let mut new_info = TableInfo::clone(current_info);
// setup new table info
match &alter_kind {
AlterKind::RenameTable { new_table_name } => {
new_info.name = new_table_name.clone();
}
AlterKind::AddColumns { .. } | AlterKind::DropColumns { .. } => {
let table_meta = &current_info.meta;
let new_meta = table_meta
.builder_with_alter_kind(table_name, alter_kind)?
.build()
.context(error::BuildTableMetaSnafu { table_name })
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
new_info.meta = new_meta;
}
}
// Increase version of the table.
new_info.ident.version = current_info.ident.version + 1;
// Do create_alter_operation first to bump next_column_id in meta.
let alter_op = create_alter_operation(table_name, alter_kind, &mut new_info.meta)?;
Ok((new_info, alter_op))
}
}
/// Create [`AlterOperation`] according to given `alter_kind`.