diff --git a/src/file-table-engine/src/engine/immutable.rs b/src/file-table-engine/src/engine/immutable.rs index 0a20faa133..0e1176ee15 100644 --- a/src/file-table-engine/src/engine/immutable.rs +++ b/src/file-table-engine/src/engine/immutable.rs @@ -234,6 +234,9 @@ impl EngineInner { return if request.create_if_not_exists { Ok(table) } else { + // If the procedure retry this method. It is possible to return error + // when the table is already created. + // TODO(yingwen): Refactor this like the mito engine. TableExistsSnafu { table_name }.fail() }; } diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 0fd90a32cb..4eac7a1da2 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -21,10 +21,8 @@ use std::sync::Arc; use async_trait::async_trait; pub use common_catalog::consts::MITO_ENGINE; -use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_procedure::{BoxedProcedure, ProcedureManager}; -use common_telemetry::tracing::log::info; use common_telemetry::{debug, logging}; use dashmap::DashMap; use datatypes::schema::Schema; @@ -33,26 +31,24 @@ use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, - CreateOptions, EngineContext as StorageEngineContext, OpenOptions, Region, - RegionDescriptorBuilder, RowKeyDescriptor, RowKeyDescriptorBuilder, StorageEngine, + EngineContext as StorageEngineContext, OpenOptions, RowKeyDescriptor, RowKeyDescriptorBuilder, + StorageEngine, }; use table::engine::{ - region_id, region_name, table_dir, EngineContext, TableEngine, TableEngineProcedure, - TableReference, + region_name, table_dir, EngineContext, TableEngine, TableEngineProcedure, TableReference, }; -use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion}; +use table::metadata::{TableInfo, TableVersion}; use table::requests::{ AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, }; -use table::table::{AlterContext, TableRef}; -use table::{error as table_error, Result as TableResult, Table}; +use table::{error as table_error, Result as TableResult, Table, TableRef}; use crate::config::EngineConfig; -use crate::engine::procedure::{AlterMitoTable, CreateMitoTable, DropMitoTable}; +use crate::engine::procedure::{AlterMitoTable, CreateMitoTable, DropMitoTable, TableCreator}; use crate::error::{ - self, BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRegionDescriptorSnafu, - BuildRowKeyDescriptorSnafu, InvalidPrimaryKeySnafu, InvalidRawSchemaSnafu, - MissingTimestampIndexSnafu, RegionNotFoundSnafu, Result, TableExistsSnafu, + BuildColumnDescriptorSnafu, BuildColumnFamilyDescriptorSnafu, BuildRowKeyDescriptorSnafu, + InvalidPrimaryKeySnafu, MissingTimestampIndexSnafu, RegionNotFoundSnafu, Result, + TableExistsSnafu, }; use crate::manifest::TableManifest; use crate::metrics; @@ -93,12 +89,36 @@ impl TableEngine for MitoEngine { async fn create_table( &self, - ctx: &EngineContext, + _ctx: &EngineContext, request: CreateTableRequest, ) -> TableResult { let _timer = common_telemetry::timer!(metrics::MITO_CREATE_TABLE_ELAPSED); - self.inner - .create_table(ctx, request) + + validate_create_table_request(&request) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + + let table_ref = request.table_ref(); + let _lock = self.inner.table_mutex.lock(table_ref.to_string()).await; + if let Some(table) = self.inner.get_mito_table(&table_ref) { + if request.create_if_not_exists { + return Ok(table); + } else { + return TableExistsSnafu { + table_name: request.table_name, + } + .fail() + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + } + } + + let mut creator = TableCreator::new(request, self.inner.clone()) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + + creator + .create_table() .await .map_err(BoxedError::new) .context(table_error::TableOperationSnafu) @@ -119,12 +139,33 @@ impl TableEngine for MitoEngine { async fn alter_table( &self, - ctx: &EngineContext, + _ctx: &EngineContext, req: AlterTableRequest, ) -> TableResult { let _timer = common_telemetry::timer!(metrics::MITO_ALTER_TABLE_ELAPSED); - self.inner - .alter_table(ctx, req) + + if let AlterKind::RenameTable { new_table_name } = &req.alter_kind { + let mut table_ref = req.table_ref(); + table_ref.table = new_table_name; + if self.inner.get_mito_table(&table_ref).is_some() { + return TableExistsSnafu { + table_name: table_ref.to_string(), + } + .fail() + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + } + } + + let mut procedure = AlterMitoTable::new(req, self.inner.clone()) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + + // TODO(yingwen): Rename has concurrent issue without the procedure runtime. But + // users can't use this method to alter a table so it is still safe. We should + // refactor the table engine to avoid using table name as key. + procedure + .engine_alter_table() .await .map_err(BoxedError::new) .context(table_error::TableOperationSnafu) @@ -147,11 +188,7 @@ impl TableEngine for MitoEngine { _ctx: &EngineContext, request: DropTableRequest, ) -> TableResult { - self.inner - .drop_table(request) - .await - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu) + self.inner.drop_table(request).await } async fn close(&self) -> TableResult<()> { @@ -345,148 +382,6 @@ fn validate_create_table_request(request: &CreateTableRequest) -> Result<()> { } impl MitoEngineInner { - async fn create_table( - &self, - _ctx: &EngineContext, - request: CreateTableRequest, - ) -> Result { - let catalog_name = &request.catalog_name; - let schema_name = &request.schema_name; - let table_name = &request.table_name; - let table_ref = TableReference { - catalog: catalog_name, - schema: schema_name, - table: table_name, - }; - - validate_create_table_request(&request)?; - - if let Some(table) = self.get_table(&table_ref) { - if request.create_if_not_exists { - return Ok(table); - } else { - return TableExistsSnafu { - table_name: format_full_table_name(catalog_name, schema_name, table_name), - } - .fail(); - } - } - - let table_schema = - Arc::new(Schema::try_from(request.schema).context(InvalidRawSchemaSnafu)?); - let primary_key_indices = &request.primary_key_indices; - let (next_column_id, default_cf) = build_column_family( - INIT_COLUMN_ID, - table_name, - &table_schema, - primary_key_indices, - )?; - let (next_column_id, row_key) = build_row_key_desc( - next_column_id, - table_name, - &table_schema, - primary_key_indices, - )?; - - let table_id = request.id; - let table_dir = table_dir(catalog_name, schema_name, table_id); - let mut regions = HashMap::with_capacity(request.region_numbers.len()); - - let _lock = self.table_mutex.lock(table_ref.to_string()).await; - // Checks again, read lock should be enough since we are guarded by the mutex. - if let Some(table) = self.get_table(&table_ref) { - return if request.create_if_not_exists { - Ok(table) - } else { - TableExistsSnafu { table_name }.fail() - }; - } - - for region_number in &request.region_numbers { - let region_id = region_id(table_id, *region_number); - - let region_name = region_name(table_id, *region_number); - let region_descriptor = RegionDescriptorBuilder::default() - .id(region_id) - .name(®ion_name) - .row_key(row_key.clone()) - .compaction_time_window(request.table_options.compaction_time_window) - .default_cf(default_cf.clone()) - .build() - .context(BuildRegionDescriptorSnafu { - table_name, - region_name, - })?; - let opts = CreateOptions { - parent_dir: table_dir.clone(), - write_buffer_size: request - .table_options - .write_buffer_size - .map(|size| size.0 as usize), - ttl: request.table_options.ttl, - compaction_time_window: request.table_options.compaction_time_window, - }; - - let region = { - let _timer = common_telemetry::timer!(crate::metrics::MITO_CREATE_REGION_ELAPSED); - self.storage_engine - .create_region(&StorageEngineContext::default(), region_descriptor, &opts) - .await - .map_err(BoxedError::new) - .context(error::CreateRegionSnafu)? - }; - info!( - "Mito engine created region: {}, id: {}", - region.name(), - region.id() - ); - regions.insert(*region_number, region); - } - - let table_meta = TableMetaBuilder::default() - .schema(table_schema) - .engine(MITO_ENGINE) - .next_column_id(next_column_id) - .primary_key_indices(request.primary_key_indices.clone()) - .options(request.table_options) - .region_numbers(request.region_numbers) - .build() - .context(error::BuildTableMetaSnafu { table_name })?; - - let table_info = TableInfoBuilder::new(table_name.clone(), table_meta) - .ident(table_id) - .table_version(INIT_TABLE_VERSION) - .table_type(TableType::Base) - .catalog_name(catalog_name.to_string()) - .schema_name(schema_name.to_string()) - .desc(request.desc) - .build() - .context(error::BuildTableInfoSnafu { table_name })?; - - let table = Arc::new( - MitoTable::create( - table_name, - &table_dir, - table_info, - regions, - self.object_store.clone(), - ) - .await?, - ); - - logging::info!( - "Mito engine created table: {} in schema: {}, table_id: {}.", - table_name, - schema_name, - table_id - ); - - // already locked - self.tables.insert(table_ref.to_string(), table.clone()); - - Ok(table) - } - async fn open_table( &self, _ctx: &EngineContext, @@ -578,6 +473,27 @@ impl MitoEngineInner { Ok(table) } + async fn drop_table(&self, request: DropTableRequest) -> TableResult { + // Remove the table from the engine to avoid further access from users. + let table_ref = request.table_ref(); + + let _lock = self.table_mutex.lock(table_ref.to_string()).await; + let removed_table = self.tables.remove(&table_ref.to_string()); + + // Close the table to close all regions. Closing a region is idempotent. + if let Some((_, table)) = &removed_table { + table + .close() + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + + Ok(true) + } else { + Ok(false) + } + } + async fn recover_table_manifest_and_info( &self, table_name: &str, @@ -607,66 +523,6 @@ impl MitoEngineInner { .map(|en| en.value().clone()) } - async fn alter_table(&self, _ctx: &EngineContext, req: AlterTableRequest) -> Result { - let catalog_name = &req.catalog_name; - let schema_name = &req.schema_name; - let table_name = &req.table_name; - - if let AlterKind::RenameTable { new_table_name } = &req.alter_kind { - let table_ref = TableReference { - catalog: catalog_name, - schema: schema_name, - table: new_table_name, - }; - - if self.get_table(&table_ref).is_some() { - return TableExistsSnafu { - table_name: table_ref.to_string(), - } - .fail(); - } - } - - let mut table_ref = TableReference { - catalog: catalog_name, - schema: schema_name, - table: table_name, - }; - let table = self - .get_mito_table(&table_ref) - .context(error::TableNotFoundSnafu { table_name })?; - - logging::info!("start altering table {} with request {:?}", table_name, req); - table - .alter(AlterContext::new(), &req) - .await - .context(error::AlterTableSnafu { table_name })?; - - if let AlterKind::RenameTable { new_table_name } = &req.alter_kind { - let removed = { - let _lock = self.table_mutex.lock(table_ref.to_string()).await; - self.tables.remove(&table_ref.to_string()) - }; - ensure!(removed.is_some(), error::TableNotFoundSnafu { table_name }); - table_ref.table = new_table_name.as_str(); - let _lock = self.table_mutex.lock(table_ref.to_string()).await; - self.tables.insert(table_ref.to_string(), table.clone()); - } - Ok(table) - } - - /// Drop table. Returns whether a table is dropped (true) or not exist (false). - async fn drop_table(&self, req: DropTableRequest) -> Result { - let table_reference = TableReference { - catalog: &req.catalog_name, - schema: &req.schema_name, - table: &req.table_name, - }; - // todo(ruihang): reclaim persisted data - let _lock = self.table_mutex.lock(table_reference.to_string()).await; - Ok(self.tables.remove(&table_reference.to_string()).is_some()) - } - async fn close(&self) -> TableResult<()> { futures::future::try_join_all( self.tables diff --git a/src/mito/src/engine/procedure.rs b/src/mito/src/engine/procedure.rs index 7ee86747fa..4d3a8b24d0 100644 --- a/src/mito/src/engine/procedure.rs +++ b/src/mito/src/engine/procedure.rs @@ -20,7 +20,7 @@ use std::sync::Arc; pub(crate) use alter::AlterMitoTable; use common_procedure::ProcedureManager; -pub(crate) use create::CreateMitoTable; +pub(crate) use create::{CreateMitoTable, TableCreator}; pub(crate) use drop::DropMitoTable; use store_api::storage::StorageEngine; diff --git a/src/mito/src/engine/procedure/alter.rs b/src/mito/src/engine/procedure/alter.rs index 60f47b4089..a22a1c17d5 100644 --- a/src/mito/src/engine/procedure/alter.rs +++ b/src/mito/src/engine/procedure/alter.rs @@ -18,21 +18,23 @@ use async_trait::async_trait; use common_procedure::error::{Error, FromJsonSnafu, ToJsonSnafu}; use common_procedure::{Context, LockKey, Procedure, ProcedureManager, Result, Status}; use common_telemetry::logging; +use common_telemetry::metric::Timer; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::manifest::Manifest; -use store_api::storage::{AlterRequest, Region, RegionMeta, StorageEngine}; +use store_api::storage::{AlterOperation, StorageEngine}; use table::engine::TableReference; use table::metadata::{RawTableInfo, TableInfo, TableVersion}; use table::requests::{AlterKind, AlterTableRequest}; -use table::Table; +use table::{Table, TableRef}; use crate::engine::MitoEngineInner; use crate::error::{ - BuildTableMetaSnafu, TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu, + TableExistsSnafu, TableNotFoundSnafu, UpdateTableManifestSnafu, VersionChangedSnafu, }; use crate::manifest::action::{TableChange, TableMetaAction, TableMetaActionList}; -use crate::table::{create_alter_operation, MitoTable}; +use crate::metrics; +use crate::table::MitoTable; /// Procedure to alter a [MitoTable]. pub(crate) struct AlterMitoTable { @@ -41,6 +43,9 @@ pub(crate) struct AlterMitoTable { table: Arc>, /// The table info after alteration. new_info: Option, + /// The region alter operation. + alter_op: Option, + _timer: Timer, } #[async_trait] @@ -52,8 +57,10 @@ impl Procedure for AlterMitoTable { async fn execute(&mut self, _ctx: &Context) -> Result { match self.data.state { AlterTableState::Prepare => self.on_prepare(), - AlterTableState::AlterRegions => self.on_alter_regions().await, - AlterTableState::UpdateTableManifest => self.on_update_table_manifest().await, + AlterTableState::EngineAlterTable => { + self.engine_alter_table().await?; + Ok(Status::Done) + } } } @@ -114,6 +121,8 @@ impl AlterMitoTable { engine_inner, table, new_info: None, + alter_op: None, + _timer: common_telemetry::timer!(metrics::MITO_ALTER_TABLE_ELAPSED), }) } @@ -151,6 +160,8 @@ impl AlterMitoTable { engine_inner, table, new_info: None, + alter_op: None, + _timer: common_telemetry::timer!(metrics::MITO_ALTER_TABLE_ELAPSED), }) } @@ -165,81 +176,58 @@ impl AlterMitoTable { } ); - self.init_new_info(¤t_info)?; - self.data.state = AlterTableState::AlterRegions; + if let AlterKind::RenameTable { new_table_name } = &self.data.request.alter_kind { + let mut table_ref = self.data.table_ref(); + table_ref.table = new_table_name; + ensure!( + self.engine_inner.get_mito_table(&table_ref).is_none(), + TableExistsSnafu { + table_name: table_ref.to_string(), + } + ); + } + + self.data.state = AlterTableState::EngineAlterTable; Ok(Status::executing(true)) } + /// Engine alters the table. + /// + /// Note that calling this method directly (without submitting the procedure + /// to the manager) to rename a table might have concurrent issue when + /// we renaming two tables to the same table name. + pub(crate) async fn engine_alter_table(&mut self) -> Result { + let current_info = self.table.table_info(); + if current_info.ident.version > self.data.table_version { + // The table is already altered. + return Ok(self.table.clone()); + } + self.init_new_info_and_op(¤t_info)?; + + self.alter_regions().await?; + + self.update_table_manifest().await + } + /// Alter regions. - async fn on_alter_regions(&mut self) -> Result { - let current_info = self.table.table_info(); - ensure!( - current_info.ident.version == self.data.table_version, - VersionChangedSnafu { - expect: self.data.table_version, - actual: current_info.ident.version, - } - ); - - self.init_new_info(¤t_info)?; - let new_info = self.new_info.as_mut().unwrap(); - let table_name = &self.data.request.table_name; - - let Some(alter_op) = create_alter_operation(table_name, &self.data.request.alter_kind, &mut new_info.meta) - .map_err(Error::from_error_ext)? else { + async fn alter_regions(&mut self) -> Result<()> { + let Some(alter_op) = &self.alter_op else { // Don't need to alter the region. - self.data.state = AlterTableState::UpdateTableManifest; - - return Ok(Status::executing(true)); + return Ok(()); }; - let regions = self.table.regions(); - // For each region, alter it if its version is not updated. - for region in regions.values() { - let region_meta = region.in_memory_metadata(); - if u64::from(region_meta.version()) > self.data.table_version { - // Region is already altered. - continue; - } - - let alter_req = AlterRequest { - operation: alter_op.clone(), - version: region_meta.version(), - }; - // Alter the region. - logging::debug!( - "start altering region {} of table {}, with request {:?}", - region.name(), - table_name, - alter_req, - ); - region - .alter(alter_req) - .await - .map_err(Error::from_error_ext)?; - } - - self.data.state = AlterTableState::UpdateTableManifest; - - Ok(Status::executing(true)) + let table_name = &self.data.request.table_name; + let table_version = self.data.table_version; + self.table + .alter_regions(table_name, table_version, alter_op) + .await + .map_err(Error::from_error_ext) } /// Persist the alteration to the manifest and update table info. - async fn on_update_table_manifest(&mut self) -> Result { - // Get current table info. - let current_info = self.table.table_info(); - if current_info.ident.version > self.data.table_version { - logging::info!( - "table {} version is already updated, current: {}, old_version: {}", - self.data.request.table_name, - current_info.ident.version, - self.data.table_version, - ); - return Ok(Status::Done); - } - - self.init_new_info(¤t_info)?; + async fn update_table_manifest(&mut self) -> Result { + // Safety: We init new info in engine_alter_table() let new_info = self.new_info.as_ref().unwrap(); let table_name = &self.data.request.table_name; @@ -249,6 +237,8 @@ impl AlterMitoTable { new_info ); + // It is possible that we write the manifest multiple times and bump the manifest + // version, but it is still correct as we always write the new table info. self.table .manifest() .update(TableMetaActionList::with_action(TableMetaAction::Change( @@ -278,35 +268,21 @@ impl AlterMitoTable { .insert(table_ref.to_string(), self.table.clone()); } - Ok(Status::Done) + Ok(self.table.clone()) } - fn init_new_info(&mut self, current_info: &TableInfo) -> Result<()> { + fn init_new_info_and_op(&mut self, current_info: &TableInfo) -> Result<()> { if self.new_info.is_some() { return Ok(()); } - let table_name = ¤t_info.name; - let mut new_info = TableInfo::clone(current_info); - // setup new table info - match &self.data.request.alter_kind { - AlterKind::RenameTable { new_table_name } => { - new_info.name = new_table_name.clone(); - } - AlterKind::AddColumns { .. } | AlterKind::DropColumns { .. } => { - let table_meta = ¤t_info.meta; - let new_meta = table_meta - .builder_with_alter_kind(table_name, &self.data.request.alter_kind) - .map_err(Error::from_error_ext)? - .build() - .context(BuildTableMetaSnafu { table_name })?; - new_info.meta = new_meta; - } - } - // Increase version of the table. - new_info.ident.version = current_info.ident.version + 1; + let (new_info, alter_op) = self + .table + .info_and_op_for_alter(current_info, &self.data.request.alter_kind) + .map_err(Error::from_error_ext)?; self.new_info = Some(new_info); + self.alter_op = alter_op; Ok(()) } @@ -315,12 +291,10 @@ impl AlterMitoTable { /// Represents each step while altering table in the mito engine. #[derive(Debug, Serialize, Deserialize)] enum AlterTableState { - /// Prepare to alter table. + /// Prepare to alter the table. Prepare, - /// Alter regions. - AlterRegions, - /// Update table manifest. - UpdateTableManifest, + /// Engine alters the table. + EngineAlterTable, } /// Serializable data of [AlterMitoTable]. diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index 121cc6fedd..59d6bdf919 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{Context, Error, LockKey, Procedure, ProcedureManager, Result, Status}; +use common_telemetry::metric::Timer; use datatypes::schema::{Schema, SchemaRef}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -28,22 +29,20 @@ use store_api::storage::{ use table::engine::{region_id, table_dir}; use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType}; use table::requests::CreateTableRequest; +use table::TableRef; use crate::engine::{self, MitoEngineInner, TableReference}; use crate::error::{ BuildRegionDescriptorSnafu, BuildTableInfoSnafu, BuildTableMetaSnafu, InvalidRawSchemaSnafu, TableExistsSnafu, }; +use crate::metrics; use crate::table::MitoTable; /// Procedure to create a [MitoTable]. pub(crate) struct CreateMitoTable { - data: CreateTableData, - engine_inner: Arc>, - /// Created regions of the table. - regions: HashMap, - /// Schema of the table. - table_schema: SchemaRef, + creator: TableCreator, + _timer: Timer, } #[async_trait] @@ -53,21 +52,21 @@ impl Procedure for CreateMitoTable { } async fn execute(&mut self, _ctx: &Context) -> Result { - match self.data.state { + match self.creator.data.state { CreateTableState::Prepare => self.on_prepare(), - CreateTableState::CreateRegions => self.on_create_regions().await, - CreateTableState::WriteTableManifest => self.on_write_table_manifest().await, + CreateTableState::EngineCreateTable => self.on_engine_create_table().await, } } fn dump(&self) -> Result { - let json = serde_json::to_string(&self.data).context(ToJsonSnafu)?; + let json = serde_json::to_string(&self.creator.data).context(ToJsonSnafu)?; Ok(json) } fn lock_key(&self) -> LockKey { - let table_ref = self.data.table_ref(); + let table_ref = self.creator.data.table_ref(); let keys = self + .creator .data .request .region_numbers @@ -85,18 +84,9 @@ impl CreateMitoTable { request: CreateTableRequest, engine_inner: Arc>, ) -> Result { - let table_schema = - Schema::try_from(request.schema.clone()).context(InvalidRawSchemaSnafu)?; - Ok(CreateMitoTable { - data: CreateTableData { - state: CreateTableState::Prepare, - request, - next_column_id: None, - }, - engine_inner, - regions: HashMap::new(), - table_schema: Arc::new(table_schema), + creator: TableCreator::new(request, engine_inner)?, + _timer: common_telemetry::timer!(metrics::MITO_CREATE_TABLE_ELAPSED), }) } @@ -125,20 +115,23 @@ impl CreateMitoTable { Schema::try_from(data.request.schema.clone()).context(InvalidRawSchemaSnafu)?; Ok(CreateMitoTable { - data, - engine_inner, - regions: HashMap::new(), - table_schema: Arc::new(table_schema), + creator: TableCreator { + data, + engine_inner, + regions: HashMap::new(), + table_schema: Arc::new(table_schema), + }, + _timer: common_telemetry::timer!(metrics::MITO_CREATE_TABLE_ELAPSED), }) } /// Checks whether the table exists. fn on_prepare(&mut self) -> Result { - let table_ref = self.data.table_ref(); - if self.engine_inner.get_table(&table_ref).is_some() { + let table_ref = self.creator.data.table_ref(); + if self.creator.engine_inner.get_table(&table_ref).is_some() { // If the table already exists. ensure!( - self.data.request.create_if_not_exists, + self.creator.data.request.create_if_not_exists, TableExistsSnafu { table_name: table_ref.to_string(), } @@ -147,31 +140,97 @@ impl CreateMitoTable { return Ok(Status::Done); } - self.data.state = CreateTableState::CreateRegions; + self.creator.data.state = CreateTableState::EngineCreateTable; Ok(Status::executing(true)) } - /// Creates regions for the table. - async fn on_create_regions(&mut self) -> Result { - let engine_ctx = EngineContext::default(); + /// Creates the table. + async fn on_engine_create_table(&mut self) -> Result { + // In this state, we can ensure we are able to create a new table. + let table_ref = self.creator.data.table_ref(); + + let _lock = self + .creator + .engine_inner + .table_mutex + .lock(table_ref.to_string()) + .await; + self.creator.create_table().await?; + + Ok(Status::Done) + } +} + +/// Mito table creator. +pub(crate) struct TableCreator { + data: CreateTableData, + engine_inner: Arc>, + /// Created regions of the table. + regions: HashMap, + /// Schema of the table. + table_schema: SchemaRef, +} + +impl TableCreator { + /// Returns a new [TableCreator]. + pub(crate) fn new( + request: CreateTableRequest, + engine_inner: Arc>, + ) -> Result { + let table_schema = + Schema::try_from(request.schema.clone()).context(InvalidRawSchemaSnafu)?; + + Ok(TableCreator { + data: CreateTableData { + state: CreateTableState::Prepare, + request, + next_column_id: None, + }, + engine_inner, + regions: HashMap::new(), + table_schema: Arc::new(table_schema), + }) + } + + /// Creates a new mito table or returns the table if it already exists. + /// + /// # Note + /// - Callers MUST acquire the table lock first. + /// - The procedure may call this method multiple times. + pub(crate) async fn create_table(&mut self) -> Result { let table_dir = table_dir( &self.data.request.catalog_name, &self.data.request.schema_name, self.data.request.id, ); + + let table_ref = self.data.table_ref(); + // It is possible that the procedure retries `CREATE TABLE` many times, so we + // return the table if it exists. + if let Some(table) = self.engine_inner.get_table(&table_ref) { + return Ok(table.clone()); + } + + self.create_regions(&table_dir).await?; + + self.write_table_manifest(&table_dir).await + } + + /// Creates regions for the table. + async fn create_regions(&mut self, table_dir: &str) -> Result<()> { let table_options = &self.data.request.table_options; let write_buffer_size = table_options.write_buffer_size.map(|size| size.0 as usize); let ttl = table_options.ttl; let compaction_time_window = table_options.compaction_time_window; let open_opts = OpenOptions { - parent_dir: table_dir.clone(), + parent_dir: table_dir.to_string(), write_buffer_size, ttl, compaction_time_window, }; let create_opts = CreateOptions { - parent_dir: table_dir, + parent_dir: table_dir.to_string(), write_buffer_size, ttl, compaction_time_window, @@ -193,6 +252,7 @@ impl CreateMitoTable { self.data.next_column_id = Some(next_column_id); // Try to open all regions and collect the regions not exist. + let engine_ctx = EngineContext::default(); for number in &self.data.request.region_numbers { if self.regions.contains_key(number) { // Region is opened. @@ -226,55 +286,47 @@ impl CreateMitoTable { region_name, })?; - let region = self - .engine_inner - .storage_engine - .create_region(&engine_ctx, region_desc, &create_opts) - .await - .map_err(Error::from_error_ext)?; + let region = { + let _timer = common_telemetry::timer!(crate::metrics::MITO_CREATE_REGION_ELAPSED); + self.engine_inner + .storage_engine + .create_region(&engine_ctx, region_desc, &create_opts) + .await + .map_err(Error::from_error_ext)? + }; self.regions.insert(*number, region); } - // All regions are created, moves to the next step. - self.data.state = CreateTableState::WriteTableManifest; - - Ok(Status::executing(true)) + Ok(()) } /// Writes metadata to the table manifest. - async fn on_write_table_manifest(&mut self) -> Result { - let table_dir = table_dir( - &self.data.request.catalog_name, - &self.data.request.schema_name, - self.data.request.id, - ); + async fn write_table_manifest(&mut self, table_dir: &str) -> Result { // Try to open the table first, as the table manifest might already exist. let table_ref = self.data.table_ref(); if let Some((manifest, table_info)) = self .engine_inner - .recover_table_manifest_and_info(&self.data.request.table_name, &table_dir) + .recover_table_manifest_and_info(&self.data.request.table_name, table_dir) .await? { let table = Arc::new(MitoTable::new(table_info, self.regions.clone(), manifest)); - let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string()); self.engine_inner .tables - .insert(table_ref.to_string(), table); - return Ok(Status::Done); + .insert(table_ref.to_string(), table.clone()); + return Ok(table); } // We need to persist the table manifest and create the table. - let table = self.write_manifest_and_create_table(&table_dir).await?; + let table = self.write_manifest_and_create_table(table_dir).await?; let table = Arc::new(table); - let _lock = self.engine_inner.table_mutex.lock(table_ref.to_string()); self.engine_inner .tables - .insert(table_ref.to_string(), table); + .insert(table_ref.to_string(), table.clone()); - Ok(Status::Done) + Ok(table) } /// Write metadata to the table manifest and return the created table. @@ -282,7 +334,7 @@ impl CreateMitoTable { &self, table_dir: &str, ) -> Result> { - // Safety: We are in `WriteTableManifest` state. + // Safety: next_column_id is always Some when calling this method. let next_column_id = self.data.next_column_id.unwrap(); let table_meta = TableMetaBuilder::default() @@ -325,12 +377,10 @@ impl CreateMitoTable { /// Represents each step while creating table in the mito engine. #[derive(Debug, Serialize, Deserialize)] enum CreateTableState { - /// Prepare to create region. + /// Prepare to create the table. Prepare, - /// Create regions. - CreateRegions, - /// Write metadata to table manifest. - WriteTableManifest, + /// Engine creates the table. + EngineCreateTable, } /// Serializable data of [CreateMitoTable]. @@ -340,7 +390,7 @@ struct CreateTableData { request: CreateTableRequest, /// Next id for column. /// - /// Available in [CreateTableState::WriteTableManifest] state. + /// Set by [TableCreator::create_regions]. next_column_id: Option, } diff --git a/src/mito/src/engine/procedure/drop.rs b/src/mito/src/engine/procedure/drop.rs index ec93452094..2beb92088b 100644 --- a/src/mito/src/engine/procedure/drop.rs +++ b/src/mito/src/engine/procedure/drop.rs @@ -43,7 +43,14 @@ impl Procedure for DropMitoTable { async fn execute(&mut self, _ctx: &Context) -> Result { match self.data.state { DropTableState::Prepare => self.on_prepare(), - DropTableState::CloseRegions => self.on_close_regions().await, + DropTableState::EngineDropTable => { + self.engine_inner + .drop_table(self.data.request.clone()) + .await + .map_err(Error::from_error_ext)?; + + Ok(Status::Done) + } } } @@ -120,42 +127,19 @@ impl DropMitoTable { /// Prepare table info. fn on_prepare(&mut self) -> Result { - self.data.state = DropTableState::CloseRegions; + self.data.state = DropTableState::EngineDropTable; Ok(Status::executing(true)) } - - /// Close all regions. - async fn on_close_regions(&mut self) -> Result { - // Remove the table from the engine to avoid further access from users. - let table_ref = self.data.table_ref(); - - let _lock = self - .engine_inner - .table_mutex - .lock(table_ref.to_string()) - .await; - self.engine_inner.tables.remove(&table_ref.to_string()); - - // Close the table to close all regions. Closing a region is idempotent. - if let Some(table) = &self.table { - table.close().await.map_err(Error::from_error_ext)?; - } - - // TODO(yingwen): Currently, DROP TABLE doesn't remove data. We can - // write a drop meta update to the table and remove all files in the - // background. - Ok(Status::Done) - } } /// Represents each step while dropping table in the mito engine. #[derive(Debug, Serialize, Deserialize)] enum DropTableState { - /// Prepare to drop table. + /// Prepare to drop the table. Prepare, - /// Close regions of this table. - CloseRegions, + /// Engine drop the table. + EngineDropTable, } /// Serializable data of [DropMitoTable]. diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index 5ded50169e..369317a304 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -31,9 +31,12 @@ use storage::region::RegionImpl; use storage::EngineImpl; use store_api::manifest::Manifest; use store_api::storage::ReadContext; +use table::engine::region_id; +use table::metadata::TableType; use table::requests::{ AddColumnRequest, AlterKind, DeleteRequest, FlushTableRequest, TableOptions, }; +use table::Table; use super::*; use crate::table::test_util::{ diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index ae4a99f6ae..87dcced42f 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -43,7 +43,7 @@ use table::error::{ InvalidTableSnafu, RegionSchemaMismatchSnafu, Result as TableResult, TableOperationSnafu, }; use table::metadata::{ - FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType, + FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType, TableVersion, }; use table::requests::{ AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest, @@ -222,29 +222,14 @@ impl Table for MitoTable { let _lock = self.alter_lock.lock().await; let table_info = self.table_info(); + let table_version = table_info.ident.version; + let (new_info, alter_op) = self.info_and_op_for_alter(&table_info, &req.alter_kind)?; let table_name = &table_info.name; - let mut new_info = TableInfo::clone(&*table_info); - // setup new table info - match &req.alter_kind { - AlterKind::RenameTable { new_table_name } => { - new_info.name = new_table_name.clone(); - } - AlterKind::AddColumns { .. } | AlterKind::DropColumns { .. } => { - let table_meta = &table_info.meta; - let new_meta = table_meta - .builder_with_alter_kind(table_name, &req.alter_kind)? - .build() - .context(error::BuildTableMetaSnafu { table_name }) - .map_err(BoxedError::new) - .context(table_error::TableOperationSnafu)?; - new_info.meta = new_meta; - } + if let Some(alter_op) = &alter_op { + self.alter_regions(table_name, table_version, alter_op) + .await?; } - // Do create_alter_operation first to bump next_column_id in meta. - let alter_op = create_alter_operation(table_name, &req.alter_kind, &mut new_info.meta)?; - // Increase version of the table. - new_info.ident.version = table_info.ident.version + 1; // Persist the alteration to the manifest. logging::debug!( @@ -265,30 +250,6 @@ impl Table for MitoTable { .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; - if let Some(alter_op) = alter_op { - // TODO(yingwen): Error handling. Maybe the region need to provide a method to - // validate the request first. - let regions = self.regions(); - for region in regions.values() { - let region_meta = region.in_memory_metadata(); - let alter_req = AlterRequest { - operation: alter_op.clone(), - version: region_meta.version(), - }; - // Alter the region. - logging::debug!( - "start altering region {} of table {}, with request {:?}", - region.name(), - table_name, - alter_req, - ); - region - .alter(alter_req) - .await - .map_err(BoxedError::new) - .context(TableOperationSnafu)?; - } - } // Update in memory metadata of the table. self.set_table_info(new_info); @@ -562,6 +523,74 @@ impl MitoTable { // TODO(dennis): use manifest version in catalog ? (manifest::MIN_VERSION, manifest::MAX_VERSION) } + + /// For each region, alter it if its version is not updated. + pub(crate) async fn alter_regions( + &self, + table_name: &str, + table_version: TableVersion, + alter_op: &AlterOperation, + ) -> TableResult<()> { + let regions = self.regions(); + for region in regions.values() { + let region_meta = region.in_memory_metadata(); + if u64::from(region_meta.version()) > table_version { + // Region is already altered. + continue; + } + + let alter_req = AlterRequest { + operation: alter_op.clone(), + version: region_meta.version(), + }; + // Alter the region. + logging::debug!( + "start altering region {} of table {}, with request {:?}", + region.name(), + table_name, + alter_req, + ); + region + .alter(alter_req) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + } + + Ok(()) + } + + pub(crate) fn info_and_op_for_alter( + &self, + current_info: &TableInfo, + alter_kind: &AlterKind, + ) -> TableResult<(TableInfo, Option)> { + let table_name = ¤t_info.name; + let mut new_info = TableInfo::clone(current_info); + // setup new table info + match &alter_kind { + AlterKind::RenameTable { new_table_name } => { + new_info.name = new_table_name.clone(); + } + AlterKind::AddColumns { .. } | AlterKind::DropColumns { .. } => { + let table_meta = ¤t_info.meta; + let new_meta = table_meta + .builder_with_alter_kind(table_name, alter_kind)? + .build() + .context(error::BuildTableMetaSnafu { table_name }) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + new_info.meta = new_meta; + } + } + // Increase version of the table. + new_info.ident.version = current_info.ident.version + 1; + + // Do create_alter_operation first to bump next_column_id in meta. + let alter_op = create_alter_operation(table_name, alter_kind, &mut new_info.meta)?; + + Ok((new_info, alter_op)) + } } /// Create [`AlterOperation`] according to given `alter_kind`.