diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index 5c8e99211b..a5a64b0efd 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -23,7 +23,6 @@ use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSn use common_procedure::{Context, LockKey, Procedure, Status}; use common_telemetry::{info, warn}; use futures_util::future; -use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use store_api::metadata::ColumnMetadata; @@ -32,11 +31,10 @@ use strum::AsRefStr; use table::metadata::TableId; use crate::ddl::utils::add_peer_context_if_needed; -use crate::ddl::{physical_table_metadata, DdlContext}; +use crate::ddl::DdlContext; use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result}; use crate::key::table_info::TableInfoValue; use crate::key::table_route::PhysicalTableRouteValue; -use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::rpc::ddl::AlterTableTask; use crate::rpc::router::find_leaders; @@ -128,7 +126,7 @@ impl AlterLogicalTablesProcedure { }); } - // Collects responses from all the alter region tasks. + // Collects responses from datanodes. let phy_raw_schemas = future::join_all(alter_region_tasks) .await .into_iter() @@ -163,44 +161,8 @@ impl AlterLogicalTablesProcedure { } pub(crate) async fn on_update_metadata(&mut self) -> Result { - if !self.data.physical_columns.is_empty() { - let physical_table_info = self.data.physical_table_info.as_ref().unwrap(); - - // Generates new table info - let old_raw_table_info = physical_table_info.table_info.clone(); - let new_raw_table_info = physical_table_metadata::build_new_physical_table_info( - old_raw_table_info, - &self.data.physical_columns, - ); - - // Updates physical table's metadata - self.context - .table_metadata_manager - .update_table_info( - DeserializedValueWithBytes::from_inner(physical_table_info.clone()), - new_raw_table_info, - ) - .await?; - } - - let table_info_values = self.build_update_metadata()?; - let manager = &self.context.table_metadata_manager; - let chunk_size = manager.batch_update_table_info_value_chunk_size(); - if table_info_values.len() > chunk_size { - let chunks = table_info_values - .into_iter() - .chunks(chunk_size) - .into_iter() - .map(|check| check.collect::>()) - .collect::>(); - for chunk in chunks { - manager.batch_update_table_info_values(chunk).await?; - } - } else { - manager - .batch_update_table_info_values(table_info_values) - .await?; - } + self.update_physical_table_metadata().await?; + self.update_logical_tables_metadata().await?; self.data.state = AlterTablesState::InvalidateTableCache; Ok(Status::executing(true)) diff --git a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs index b31e0a8799..3c8623ea24 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs @@ -13,16 +13,70 @@ // limitations under the License. use common_grpc_expr::alter_expr_to_request; +use common_telemetry::warn; +use itertools::Itertools; use snafu::ResultExt; use table::metadata::{RawTableInfo, TableInfo}; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; +use crate::ddl::physical_table_metadata; use crate::error; use crate::error::{ConvertAlterTableRequestSnafu, Result}; use crate::key::table_info::TableInfoValue; +use crate::key::DeserializedValueWithBytes; use crate::rpc::ddl::AlterTableTask; impl AlterLogicalTablesProcedure { + pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> { + if self.data.physical_columns.is_empty() { + warn!("No physical columns found, leaving the physical table's schema unchanged when altering logical tables"); + return Ok(()); + } + + let physical_table_info = self.data.physical_table_info.as_ref().unwrap(); + + // Generates new table info + let old_raw_table_info = physical_table_info.table_info.clone(); + let new_raw_table_info = physical_table_metadata::build_new_physical_table_info( + old_raw_table_info, + &self.data.physical_columns, + ); + + // Updates physical table's metadata + self.context + .table_metadata_manager + .update_table_info( + DeserializedValueWithBytes::from_inner(physical_table_info.clone()), + new_raw_table_info, + ) + .await?; + + Ok(()) + } + + pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> { + let table_info_values = self.build_update_metadata()?; + let manager = &self.context.table_metadata_manager; + let chunk_size = manager.batch_update_table_info_value_chunk_size(); + if table_info_values.len() > chunk_size { + let chunks = table_info_values + .into_iter() + .chunks(chunk_size) + .into_iter() + .map(|check| check.collect::>()) + .collect::>(); + for chunk in chunks { + manager.batch_update_table_info_values(chunk).await?; + } + } else { + manager + .batch_update_table_info_values(table_info_values) + .await?; + } + + Ok(()) + } + pub(crate) fn build_update_metadata(&self) -> Result> { let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len()); for (task, table) in self diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index df64b8e286..d050e7e3e4 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -12,45 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::ops::Deref; +mod check; +mod metadata; +mod region_request; +mod update_metadata; -use api::v1::region::region_request::Body as PbRegionRequest; -use api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader}; use api::v1::CreateTableExpr; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; -use common_telemetry::tracing_context::TracingContext; -use common_telemetry::{info, warn}; +use common_telemetry::warn; use futures_util::future::join_all; -use itertools::Itertools; use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, ResultExt}; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use store_api::storage::{RegionId, RegionNumber}; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId}; -use crate::cache_invalidator::Context; -use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; -use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path}; -use crate::ddl::{physical_table_metadata, DdlContext}; -use crate::error::{ - DecodeJsonSnafu, MetadataCorruptionSnafu, Result, TableAlreadyExistsSnafu, - TableInfoNotFoundSnafu, -}; -use crate::instruction::CacheIdent; -use crate::key::table_info::TableInfoValue; -use crate::key::table_name::TableNameKey; +use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error}; +use crate::ddl::DdlContext; +use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result}; use crate::key::table_route::TableRouteValue; -use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; -use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; -use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; -use crate::table_name::TableName; +use crate::rpc::router::{find_leaders, RegionRoute}; use crate::{metrics, ClusterId}; pub struct CreateLogicalTablesProcedure { @@ -67,17 +54,18 @@ impl CreateLogicalTablesProcedure { physical_table_id: TableId, context: DdlContext, ) -> Self { - let len = tasks.len(); - let data = CreateTablesData { - cluster_id, - state: CreateTablesState::Prepare, - tasks, - table_ids_already_exists: vec![None; len], - physical_table_id, - physical_region_numbers: vec![], - physical_columns: vec![], - }; - Self { context, data } + Self { + context, + data: CreateTablesData { + cluster_id, + state: CreateTablesState::Prepare, + tasks, + table_ids_already_exists: vec![], + physical_table_id, + physical_region_numbers: vec![], + physical_columns: vec![], + }, + } } pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { @@ -96,91 +84,45 @@ impl CreateLogicalTablesProcedure { /// - Failed to check whether tables exist. /// - One of logical tables has existing, and the table creation task without setting `create_if_not_exists`. pub(crate) async fn on_prepare(&mut self) -> Result { - let manager = &self.context.table_metadata_manager; - + self.check_input_tasks()?; // Sets physical region numbers - let physical_table_id = self.data.physical_table_id(); - let physical_region_numbers = manager - .table_route_manager() - .get_physical_table_route(physical_table_id) - .await - .map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?; - self.data - .set_physical_region_numbers(physical_region_numbers); - + self.fill_physical_table_info().await?; // Checks if the tables exist - let table_name_keys = self - .data - .all_create_table_exprs() - .iter() - .map(|expr| TableNameKey::new(&expr.catalog_name, &expr.schema_name, &expr.table_name)) - .collect::>(); - let already_exists_tables_ids = manager - .table_name_manager() - .batch_get(table_name_keys) - .await? - .iter() - .map(|x| x.map(|x| x.table_id())) - .collect::>(); - - // Validates the tasks - let tasks = &mut self.data.tasks; - for (task, table_id) in tasks.iter().zip(already_exists_tables_ids.iter()) { - if table_id.is_some() { - // If a table already exists, we just ignore it. - ensure!( - task.create_table.create_if_not_exists, - TableAlreadyExistsSnafu { - table_name: task.create_table.table_name.to_string(), - } - ); - continue; - } - } + self.check_tables_already_exist().await?; // If all tables already exist, returns the table_ids. - if already_exists_tables_ids.iter().all(Option::is_some) { + if self + .data + .table_ids_already_exists + .iter() + .all(Option::is_some) + { return Ok(Status::done_with_output( - already_exists_tables_ids - .into_iter() + self.data + .table_ids_already_exists + .drain(..) .flatten() .collect::>(), )); } // Allocates table ids and sort columns on their names. - for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) { - let table_id = if let Some(table_id) = table_id { - *table_id - } else { - self.context - .table_metadata_allocator - .allocate_table_id(task) - .await? - }; - task.set_table_id(table_id); + self.allocate_table_ids().await?; - // sort columns in task - task.sort_columns(); - } - - self.data - .set_table_ids_already_exists(already_exists_tables_ids); self.data.state = CreateTablesState::DatanodeCreateRegions; Ok(Status::executing(true)) } pub async fn on_datanode_create_regions(&mut self) -> Result { - let physical_table_id = self.data.physical_table_id(); let (_, physical_table_route) = self .context .table_metadata_manager .table_route_manager() - .get_physical_table_route(physical_table_id) + .get_physical_table_route(self.data.physical_table_id) .await?; - let region_routes = &physical_table_route.region_routes; - self.create_regions(region_routes).await + self.create_regions(&physical_table_route.region_routes) + .await } /// Creates table metadata for logical tables and update corresponding physical @@ -189,179 +131,54 @@ impl CreateLogicalTablesProcedure { /// Abort(not-retry): /// - Failed to create table metadata. pub async fn on_create_metadata(&mut self) -> Result { - let manager = &self.context.table_metadata_manager; - let physical_table_id = self.data.physical_table_id(); - let remaining_tasks = self.data.remaining_tasks(); - let num_tables = remaining_tasks.len(); - - if num_tables > 0 { - let chunk_size = manager.create_logical_tables_metadata_chunk_size(); - if num_tables > chunk_size { - let chunks = remaining_tasks - .into_iter() - .chunks(chunk_size) - .into_iter() - .map(|chunk| chunk.collect::>()) - .collect::>(); - for chunk in chunks { - manager.create_logical_tables_metadata(chunk).await?; - } - } else { - manager - .create_logical_tables_metadata(remaining_tasks) - .await?; - } - } - - // The `table_id` MUST be collected after the [Prepare::Prepare], - // ensures the all `table_id`s have been allocated. - let table_ids = self - .data - .tasks - .iter() - .map(|task| task.table_info.ident.table_id) - .collect::>(); - - if !self.data.physical_columns.is_empty() { - // fetch old physical table's info - let physical_table_info = self - .context - .table_metadata_manager - .table_info_manager() - .get(self.data.physical_table_id) - .await? - .with_context(|| TableInfoNotFoundSnafu { - table: format!("table id - {}", self.data.physical_table_id), - })?; - - // generate new table info - let new_table_info = self - .data - .build_new_physical_table_info(&physical_table_info); - - let physical_table_name = TableName::new( - &new_table_info.catalog_name, - &new_table_info.schema_name, - &new_table_info.name, - ); - - // update physical table's metadata - self.context - .table_metadata_manager - .update_table_info(physical_table_info, new_table_info) - .await?; - - // invalid table cache - self.context - .cache_invalidator - .invalidate( - &Context::default(), - vec![ - CacheIdent::TableId(self.data.physical_table_id), - CacheIdent::TableName(physical_table_name), - ], - ) - .await?; - } else { - warn!("No physical columns found, leaving the physical table's schema unchanged"); - } - - info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}"); + self.update_physical_table_metadata().await?; + let table_ids = self.create_logical_tables_metadata().await?; Ok(Status::done_with_output(table_ids)) } - fn create_region_request_builder( - &self, - physical_table_id: TableId, - task: &CreateTableTask, - ) -> Result { - let create_expr = &task.create_table; - let template = build_template(create_expr)?; - Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) - } - - fn one_datanode_region_requests( - &self, - datanode: &Peer, - region_routes: &[RegionRoute], - ) -> Result { - let create_tables_data = &self.data; - let tasks = &create_tables_data.tasks; - let physical_table_id = create_tables_data.physical_table_id(); - let regions = find_leader_regions(region_routes, datanode); - let mut requests = Vec::with_capacity(tasks.len() * regions.len()); - - for task in tasks { - let create_table_expr = &task.create_table; - let catalog = &create_table_expr.catalog_name; - let schema = &create_table_expr.schema_name; - let logical_table_id = task.table_info.ident.table_id; - let storage_path = region_storage_path(catalog, schema); - let request_builder = self.create_region_request_builder(physical_table_id, task)?; - - for region_number in ®ions { - let region_id = RegionId::new(logical_table_id, *region_number); - let create_region_request = - request_builder.build_one(region_id, storage_path.clone(), &HashMap::new())?; - requests.push(create_region_request); - } - } - - Ok(CreateRequests { requests }) - } - async fn create_regions(&mut self, region_routes: &[RegionRoute]) -> Result { let leaders = find_leaders(region_routes); let mut create_region_tasks = Vec::with_capacity(leaders.len()); - for datanode in leaders { - let requester = self.context.datanode_manager.datanode(&datanode).await; - let creates = self.one_datanode_region_requests(&datanode, region_routes)?; - let request = RegionRequest { - header: Some(RegionRequestHeader { - tracing_context: TracingContext::from_current_span().to_w3c(), - ..Default::default() - }), - body: Some(PbRegionRequest::Creates(creates)), - }; + for peer in leaders { + let requester = self.context.datanode_manager.datanode(&peer).await; + let request = self.make_request(&peer, region_routes)?; + create_region_tasks.push(async move { requester .handle(request) .await - .map_err(add_peer_context_if_needed(datanode)) + .map_err(add_peer_context_if_needed(peer)) }); } - // collect response from datanodes - let raw_schemas = join_all(create_region_tasks) + // Collects response from datanodes. + let phy_raw_schemas = join_all(create_region_tasks) .await .into_iter() - .map(|response| { - response.map(|mut response| response.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY)) - }) + .map(|res| res.map(|mut res| res.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY))) .collect::>>()?; - if raw_schemas.is_empty() { + if phy_raw_schemas.is_empty() { self.data.state = CreateTablesState::CreateMetadata; return Ok(Status::executing(false)); } - // verify all datanodes return the same raw schemas - // Safety: previous check ensures this vector is not empty. - let first = raw_schemas.first().unwrap(); + // Verify all the physical schemas are the same + // Safety: previous check ensures this vec is not empty + let first = phy_raw_schemas.first().unwrap(); ensure!( - raw_schemas.iter().all(|x| x == first), + phy_raw_schemas.iter().all(|x| x == first), MetadataCorruptionSnafu { - err_msg: "Raw schemas from datanodes are not the same" + err_msg: "The physical schemas from datanodes are not the same." } ); - // decode raw schemas and store it - if let Some(raw_schema) = first { - let physical_columns = - ColumnMetadata::decode_list(raw_schema).context(DecodeJsonSnafu)?; - self.data.physical_columns = physical_columns; + // Decodes the physical raw schemas + if let Some(phy_raw_schemas) = first { + self.data.physical_columns = + ColumnMetadata::decode_list(phy_raw_schemas).context(DecodeJsonSnafu)?; } else { warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged"); } @@ -405,7 +222,7 @@ impl Procedure for CreateLogicalTablesProcedure { let table_ref = self.data.tasks[0].table_ref(); lock_key.push(CatalogLock::Read(table_ref.catalog).into()); lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into()); - lock_key.push(TableLock::Write(self.data.physical_table_id()).into()); + lock_key.push(TableLock::Write(self.data.physical_table_id).into()); for task in &self.data.tasks { lock_key.push( @@ -437,18 +254,6 @@ impl CreateTablesData { &self.state } - fn physical_table_id(&self) -> TableId { - self.physical_table_id - } - - fn set_physical_region_numbers(&mut self, physical_region_numbers: Vec) { - self.physical_region_numbers = physical_region_numbers; - } - - fn set_table_ids_already_exists(&mut self, table_ids_already_exists: Vec>) { - self.table_ids_already_exists = table_ids_already_exists; - } - fn all_create_table_exprs(&self) -> Vec<&CreateTableExpr> { self.tasks .iter() @@ -480,21 +285,6 @@ impl CreateTablesData { }) .collect::>() } - - /// Generate the new physical table info. - /// - /// This method will consumes the physical columns. - fn build_new_physical_table_info( - &mut self, - old_table_info: &DeserializedValueWithBytes, - ) -> RawTableInfo { - let raw_table_info = old_table_info.deref().table_info.clone(); - - physical_table_metadata::build_new_physical_table_info( - raw_table_info, - &self.physical_columns, - ) - } } #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)] diff --git a/src/common/meta/src/ddl/create_logical_tables/check.rs b/src/common/meta/src/ddl/create_logical_tables/check.rs new file mode 100644 index 0000000000..2f86b8ac44 --- /dev/null +++ b/src/common/meta/src/ddl/create_logical_tables/check.rs @@ -0,0 +1,81 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ensure; + +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::error::{CreateLogicalTablesInvalidArgumentsSnafu, Result, TableAlreadyExistsSnafu}; +use crate::key::table_name::TableNameKey; + +impl CreateLogicalTablesProcedure { + pub(crate) fn check_input_tasks(&self) -> Result<()> { + self.check_schema()?; + + Ok(()) + } + + pub(crate) async fn check_tables_already_exist(&mut self) -> Result<()> { + let table_name_keys = self + .data + .all_create_table_exprs() + .iter() + .map(|expr| TableNameKey::new(&expr.catalog_name, &expr.schema_name, &expr.table_name)) + .collect::>(); + let table_ids_already_exists = self + .context + .table_metadata_manager + .table_name_manager() + .batch_get(table_name_keys) + .await? + .iter() + .map(|x| x.map(|x| x.table_id())) + .collect::>(); + + self.data.table_ids_already_exists = table_ids_already_exists; + + // Validates the tasks + let tasks = &mut self.data.tasks; + for (task, table_id) in tasks.iter().zip(self.data.table_ids_already_exists.iter()) { + if table_id.is_some() { + // If a table already exists, we just ignore it. + ensure!( + task.create_table.create_if_not_exists, + TableAlreadyExistsSnafu { + table_name: task.create_table.table_name.to_string(), + } + ); + continue; + } + } + + Ok(()) + } + + // Checks if the schemas of the tasks are the same + fn check_schema(&self) -> Result<()> { + let is_same_schema = self.data.tasks.windows(2).all(|pair| { + pair[0].create_table.catalog_name == pair[1].create_table.catalog_name + && pair[0].create_table.schema_name == pair[1].create_table.schema_name + }); + + ensure!( + is_same_schema, + CreateLogicalTablesInvalidArgumentsSnafu { + err_msg: "Schemas of the tasks are not the same" + } + ); + + Ok(()) + } +} diff --git a/src/common/meta/src/ddl/create_logical_tables/metadata.rs b/src/common/meta/src/ddl/create_logical_tables/metadata.rs new file mode 100644 index 0000000000..2d61719d39 --- /dev/null +++ b/src/common/meta/src/ddl/create_logical_tables/metadata.rs @@ -0,0 +1,57 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::error::Result; +use crate::key::table_route::TableRouteValue; + +impl CreateLogicalTablesProcedure { + pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> { + let physical_region_numbers = self + .context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(self.data.physical_table_id) + .await + .map(|(_, route)| TableRouteValue::Physical(route).region_numbers())?; + + self.data.physical_region_numbers = physical_region_numbers; + + Ok(()) + } + + pub(crate) async fn allocate_table_ids(&mut self) -> Result<()> { + for (task, table_id) in self + .data + .tasks + .iter_mut() + .zip(self.data.table_ids_already_exists.iter()) + { + let table_id = if let Some(table_id) = table_id { + *table_id + } else { + self.context + .table_metadata_allocator + .allocate_table_id(task) + .await? + }; + task.set_table_id(table_id); + + // sort columns in task + task.sort_columns(); + } + + Ok(()) + } +} diff --git a/src/common/meta/src/ddl/create_logical_tables/region_request.rs b/src/common/meta/src/ddl/create_logical_tables/region_request.rs new file mode 100644 index 0000000000..bc0d290c4e --- /dev/null +++ b/src/common/meta/src/ddl/create_logical_tables/region_request.rs @@ -0,0 +1,74 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader}; +use common_telemetry::tracing_context::TracingContext; +use store_api::storage::RegionId; + +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; +use crate::ddl::utils::region_storage_path; +use crate::error::Result; +use crate::peer::Peer; +use crate::rpc::ddl::CreateTableTask; +use crate::rpc::router::{find_leader_regions, RegionRoute}; + +impl CreateLogicalTablesProcedure { + pub(crate) fn make_request( + &self, + peer: &Peer, + region_routes: &[RegionRoute], + ) -> Result { + let tasks = &self.data.tasks; + let regions_on_this_peer = find_leader_regions(region_routes, peer); + let mut requests = Vec::with_capacity(tasks.len() * regions_on_this_peer.len()); + for task in tasks { + let create_table_expr = &task.create_table; + let catalog = &create_table_expr.catalog_name; + let schema = &create_table_expr.schema_name; + let logical_table_id = task.table_info.ident.table_id; + let storage_path = region_storage_path(catalog, schema); + let request_builder = self.create_region_request_builder(task)?; + + for region_number in ®ions_on_this_peer { + let region_id = RegionId::new(logical_table_id, *region_number); + let one_region_request = + request_builder.build_one(region_id, storage_path.clone(), &HashMap::new())?; + requests.push(one_region_request); + } + } + + Ok(RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::Creates(CreateRequests { requests })), + }) + } + + fn create_region_request_builder( + &self, + task: &CreateTableTask, + ) -> Result { + let create_expr = &task.create_table; + let template = build_template(create_expr)?; + Ok(CreateRequestBuilder::new( + template, + Some(self.data.physical_table_id), + )) + } +} diff --git a/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs new file mode 100644 index 0000000000..6500350727 --- /dev/null +++ b/src/common/meta/src/ddl/create_logical_tables/update_metadata.rs @@ -0,0 +1,128 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; + +use common_telemetry::{info, warn}; +use itertools::Itertools; +use snafu::OptionExt; +use table::metadata::TableId; + +use crate::cache_invalidator::Context; +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; +use crate::ddl::physical_table_metadata; +use crate::error::{Result, TableInfoNotFoundSnafu}; +use crate::instruction::CacheIdent; +use crate::table_name::TableName; + +impl CreateLogicalTablesProcedure { + pub(crate) async fn update_physical_table_metadata(&mut self) -> Result<()> { + if self.data.physical_columns.is_empty() { + warn!("No physical columns found, leaving the physical table's schema unchanged when creating logical tables"); + return Ok(()); + } + + // Fetches old physical table's info + let physical_table_info = self + .context + .table_metadata_manager + .table_info_manager() + .get(self.data.physical_table_id) + .await? + .with_context(|| TableInfoNotFoundSnafu { + table: format!("table id - {}", self.data.physical_table_id), + })?; + + // Generates new table info + let raw_table_info = physical_table_info.deref().table_info.clone(); + + let new_table_info = physical_table_metadata::build_new_physical_table_info( + raw_table_info, + &self.data.physical_columns, + ); + + let physical_table_name = TableName::new( + &new_table_info.catalog_name, + &new_table_info.schema_name, + &new_table_info.name, + ); + + // Update physical table's metadata + self.context + .table_metadata_manager + .update_table_info(physical_table_info, new_table_info) + .await?; + + // Invalid physical table cache + self.context + .cache_invalidator + .invalidate( + &Context::default(), + vec![ + CacheIdent::TableId(self.data.physical_table_id), + CacheIdent::TableName(physical_table_name), + ], + ) + .await?; + + Ok(()) + } + + pub(crate) async fn create_logical_tables_metadata(&mut self) -> Result> { + let remaining_tasks = self.data.remaining_tasks(); + let num_tables = remaining_tasks.len(); + + if num_tables > 0 { + let chunk_size = self + .context + .table_metadata_manager + .create_logical_tables_metadata_chunk_size(); + if num_tables > chunk_size { + let chunks = remaining_tasks + .into_iter() + .chunks(chunk_size) + .into_iter() + .map(|chunk| chunk.collect::>()) + .collect::>(); + for chunk in chunks { + self.context + .table_metadata_manager + .create_logical_tables_metadata(chunk) + .await?; + } + } else { + self.context + .table_metadata_manager + .create_logical_tables_metadata(remaining_tasks) + .await?; + } + } + + // The `table_id` MUST be collected after the [Prepare::Prepare], + // ensures the all `table_id`s have been allocated. + let table_ids = self + .data + .tasks + .iter() + .map(|task| task.table_info.ident.table_id) + .collect::>(); + + info!( + "Created {num_tables} tables {table_ids:?} metadata for physical table {}", + self.data.physical_table_id + ); + + Ok(table_ids) + } +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index ff067aa609..a4728676ba 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -403,6 +403,9 @@ pub enum Error { #[snafu(display("Alter logical tables invalid arguments: {}", err_msg))] AlterLogicalTablesInvalidArguments { err_msg: String, location: Location }, + + #[snafu(display("Create logical tables invalid arguments: {}", err_msg))] + CreateLogicalTablesInvalidArguments { err_msg: String, location: Location }, } pub type Result = std::result::Result; @@ -463,7 +466,8 @@ impl ErrorExt for Error { | PrimaryKeyNotFound { .. } | EmptyKey { .. } | InvalidEngineType { .. } - | AlterLogicalTablesInvalidArguments { .. } => StatusCode::InvalidArguments, + | AlterLogicalTablesInvalidArguments { .. } + | CreateLogicalTablesInvalidArguments { .. } => StatusCode::InvalidArguments, TableNotFound { .. } => StatusCode::TableNotFound, TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 49f1ef6895..5db1d3b401 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -329,22 +329,6 @@ impl StatementExecutor { name: "alter table" } ); - ensure!( - alter_table_exprs - .windows(2) - .all(|expr| expr[0].catalog_name == expr[1].catalog_name), - DdlWithMultiCatalogsSnafu { - ddl_name: "alter tables", - } - ); - ensure!( - alter_table_exprs - .windows(2) - .all(|expr| expr[0].schema_name == expr[1].schema_name), - DdlWithMultiSchemasSnafu { - ddl_name: "alter tables", - } - ); self.alter_logical_tables_procedure(alter_table_exprs) .await?;