diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 124330ee47..39c7982eae 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use catalog::kvbackend::CachedMetaKvBackendBuilder; +use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager}; use clap::Parser; use client::client_manager::DatanodeClients; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; @@ -248,11 +248,12 @@ impl StartCommand { .build(); let cached_meta_backend = Arc::new(cached_meta_backend); + let catalog_manager = + KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend.clone()); + let executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new( - cached_meta_backend.clone(), - )), + Arc::new(InvalidateTableCacheHandler::new(catalog_manager.clone())), ]); let heartbeat_task = HeartbeatTask::new( @@ -263,10 +264,11 @@ impl StartCommand { let mut instance = FrontendBuilder::new( cached_meta_backend.clone(), + catalog_manager.clone(), Arc::new(DatanodeClients::default()), meta_client, ) - .with_cache_invalidator(cached_meta_backend) + .with_cache_invalidator(catalog_manager.clone()) .with_plugin(plugins.clone()) .with_heartbeat_task(heartbeat_task) .try_build() diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index c8b0385cfe..4082b8aa9e 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -16,10 +16,11 @@ use std::sync::Arc; use std::{fs, path}; use async_trait::async_trait; +use catalog::kvbackend::KvBackendCatalogManager; use clap::Parser; use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::{metadata_store_dir, KvBackendConfig}; -use common_meta::cache_invalidator::DummyCacheInvalidator; +use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::ProcedureExecutorRef; @@ -399,6 +400,9 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; + let catalog_manager = + KvBackendCatalogManager::new(kv_backend.clone(), Arc::new(DummyCacheInvalidator)); + let builder = DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone()); let datanode = builder.build().await.context(StartDatanodeSnafu)?; @@ -429,15 +433,22 @@ impl StartCommand { table_metadata_manager, procedure_manager.clone(), datanode_manager.clone(), + catalog_manager.clone(), table_meta_allocator, ) .await?; - let mut frontend = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor) - .with_plugin(fe_plugins.clone()) - .try_build() - .await - .context(StartFrontendSnafu)?; + let mut frontend = FrontendBuilder::new( + kv_backend, + catalog_manager.clone(), + datanode_manager, + ddl_task_executor, + ) + .with_plugin(fe_plugins.clone()) + .with_cache_invalidator(catalog_manager) + .try_build() + .await + .context(StartFrontendSnafu)?; let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins) .build() @@ -459,13 +470,14 @@ impl StartCommand { table_metadata_manager: TableMetadataManagerRef, procedure_manager: ProcedureManagerRef, datanode_manager: DatanodeManagerRef, + cache_invalidator: CacheInvalidatorRef, table_meta_allocator: TableMetadataAllocatorRef, ) -> Result { let procedure_executor: ProcedureExecutorRef = Arc::new( DdlManager::try_new( procedure_manager, datanode_manager, - Arc::new(DummyCacheInvalidator), + cache_invalidator, table_metadata_manager, table_meta_allocator, Arc::new(MemoryRegionKeeper::default()), diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index efef95916a..4f2fd6f6df 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -35,6 +35,7 @@ pub mod create_table; mod create_table_template; pub mod drop_database; pub mod drop_table; +mod physical_table_metadata; pub mod table_meta; #[cfg(any(test, feature = "testing"))] pub mod test_util; diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index a109919d20..b2b8b6858b 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -15,25 +15,29 @@ mod check; mod metadata; mod region_request; +mod table_cache_keys; mod update_metadata; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context, LockKey, Procedure, Status}; +use common_telemetry::{info, warn}; use futures_util::future; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; +use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use strum::AsRefStr; use table::metadata::TableId; use crate::ddl::utils::add_peer_context_if_needed; -use crate::ddl::DdlContext; -use crate::error::{Error, Result}; -use crate::instruction::CacheIdent; +use crate::ddl::{physical_table_metadata, DdlContext}; +use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result}; use crate::key::table_info::TableInfoValue; use crate::key::table_route::PhysicalTableRouteValue; -use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; +use crate::key::DeserializedValueWithBytes; +use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::rpc::ddl::AlterTableTask; use crate::rpc::router::{find_leader_regions, find_leaders}; use crate::{cache_invalidator, metrics, ClusterId}; @@ -60,8 +64,9 @@ impl AlterLogicalTablesProcedure { tasks, table_info_values: vec![], physical_table_id, + physical_table_info: None, physical_table_route: None, - cache_invalidate_keys: vec![], + physical_columns: vec![], }, } } @@ -79,11 +84,24 @@ impl AlterLogicalTablesProcedure { // Checks the physical table, must after [fill_table_info_values] self.check_physical_table().await?; // Fills the physical table info - self.fill_physical_table_route().await?; - // Filter the tasks + self.fill_physical_table_info().await?; + // Filter the finished tasks let finished_tasks = self.check_finished_tasks()?; - if finished_tasks.iter().all(|x| *x) { - return Ok(Status::done()); + let already_finished_count = finished_tasks + .iter() + .map(|x| if *x { 1 } else { 0 }) + .sum::(); + let apply_tasks_count = self.data.tasks.len(); + if already_finished_count == apply_tasks_count { + info!("All the alter tasks are finished, will skip the procedure."); + // Re-invalidate the table cache + self.data.state = AlterTablesState::InvalidateTableCache; + return Ok(Status::executing(true)); + } else if already_finished_count > 0 { + info!( + "There are {} alter tasks, {} of them were already finished.", + apply_tasks_count, already_finished_count + ); } self.filter_task(&finished_tasks)?; @@ -116,17 +134,61 @@ impl AlterLogicalTablesProcedure { } } - future::join_all(alter_region_tasks) + // Collects responses from all the alter region tasks. + let phy_raw_schemas = future::join_all(alter_region_tasks) .await .into_iter() + .map(|res| res.map(|mut res| res.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY))) .collect::>>()?; - self.data.state = AlterTablesState::UpdateMetadata; + if phy_raw_schemas.is_empty() { + self.data.state = AlterTablesState::UpdateMetadata; + return Ok(Status::executing(true)); + } + // Verify all the physical schemas are the same + // Safety: previous check ensures this vec is not empty + let first = phy_raw_schemas.first().unwrap(); + ensure!( + phy_raw_schemas.iter().all(|x| x == first), + MetadataCorruptionSnafu { + err_msg: "The physical schemas from datanodes are not the same." + } + ); + + // Decodes the physical raw schemas + if let Some(phy_raw_schema) = first { + self.data.physical_columns = + ColumnMetadata::decode_list(phy_raw_schema).context(DecodeJsonSnafu)?; + } else { + warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged"); + } + + self.data.state = AlterTablesState::UpdateMetadata; Ok(Status::executing(true)) } pub(crate) async fn on_update_metadata(&mut self) -> Result { + if !self.data.physical_columns.is_empty() { + let physical_table_info = self.data.physical_table_info.as_ref().unwrap(); + + // Generates new table info + let old_raw_table_info = physical_table_info.table_info.clone(); + let new_raw_table_info = physical_table_metadata::build_new_physical_table_info( + old_raw_table_info, + &self.data.physical_columns, + ); + + // Updates physical table's metadata + self.context + .table_metadata_manager + .update_table_info( + DeserializedValueWithBytes::from_inner(physical_table_info.clone()), + new_raw_table_info, + ) + .await?; + } + let table_info_values = self.build_update_metadata()?; let manager = &self.context.table_metadata_manager; let chunk_size = manager.batch_update_table_info_value_chunk_size(); @@ -151,15 +213,12 @@ impl AlterLogicalTablesProcedure { } pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result { - let to_invalidate = self - .data - .cache_invalidate_keys - .drain(..) - .map(CacheIdent::TableId) - .collect::>(); + let ctx = cache_invalidator::Context::default(); + let to_invalidate = self.build_table_cache_keys_to_invalidate(); + self.context .cache_invalidator - .invalidate(&cache_invalidator::Context::default(), to_invalidate) + .invalidate(&ctx, to_invalidate) .await?; Ok(Status::done()) } @@ -212,17 +271,13 @@ impl Procedure for AlterLogicalTablesProcedure { lock_key.push(CatalogLock::Read(table_ref.catalog).into()); lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into()); lock_key.push(TableLock::Write(self.data.physical_table_id).into()); + lock_key.extend( + self.data + .table_info_values + .iter() + .map(|table| TableLock::Write(table.table_info.ident.table_id).into()), + ); - for task in &self.data.tasks { - lock_key.push( - TableNameLock::new( - &task.alter_table.catalog_name, - &task.alter_table.schema_name, - &task.alter_table.table_name, - ) - .into(), - ); - } LockKey::new(lock_key) } } @@ -237,8 +292,9 @@ pub struct AlterTablesData { table_info_values: Vec, /// Physical table info physical_table_id: TableId, + physical_table_info: Option, physical_table_route: Option, - cache_invalidate_keys: Vec, + physical_columns: Vec, } #[derive(Debug, Serialize, Deserialize, AsRefStr)] diff --git a/src/common/meta/src/ddl/alter_logical_tables/metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/metadata.rs index 74747abb85..60dde7a634 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/metadata.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/metadata.rs @@ -17,9 +17,13 @@ use snafu::OptionExt; use table::metadata::TableId; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; -use crate::error::{Result, TableInfoNotFoundSnafu, TableNotFoundSnafu}; +use crate::error::{ + AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu, TableNotFoundSnafu, + TableRouteNotFoundSnafu, +}; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; +use crate::key::table_route::TableRouteValue; use crate::rpc::ddl::AlterTableTask; impl AlterLogicalTablesProcedure { @@ -46,21 +50,38 @@ impl AlterLogicalTablesProcedure { } }) .collect(); - self.data.cache_invalidate_keys = self - .data - .table_info_values - .iter() - .map(|table| table.table_info.ident.table_id) - .collect(); Ok(()) } - pub(crate) async fn fill_physical_table_route(&mut self) -> Result<()> { - let table_route_manager = self.context.table_metadata_manager.table_route_manager(); - let (_, physical_table_route) = table_route_manager - .get_physical_table_route(self.data.physical_table_id) + pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> { + let (physical_table_info, physical_table_route) = self + .context + .table_metadata_manager + .get_full_table_info(self.data.physical_table_id) .await?; + + let physical_table_info = physical_table_info + .with_context(|| TableInfoNotFoundSnafu { + table: format!("table id - {}", self.data.physical_table_id), + })? + .into_inner(); + let physical_table_route = physical_table_route + .context(TableRouteNotFoundSnafu { + table_id: self.data.physical_table_id, + })? + .into_inner(); + + self.data.physical_table_info = Some(physical_table_info); + let TableRouteValue::Physical(physical_table_route) = physical_table_route else { + return AlterLogicalTablesInvalidArgumentsSnafu { + err_msg: format!( + "expected a physical table but got a logical table: {:?}", + self.data.physical_table_id + ), + } + .fail(); + }; self.data.physical_table_route = Some(physical_table_route); Ok(()) @@ -87,7 +108,7 @@ impl AlterLogicalTablesProcedure { table_info_map .remove(table_id) .with_context(|| TableInfoNotFoundSnafu { - table_name: extract_table_name(task), + table: extract_table_name(task), })?; table_info_values.push(table_info_value); } diff --git a/src/common/meta/src/ddl/alter_logical_tables/table_cache_keys.rs b/src/common/meta/src/ddl/alter_logical_tables/table_cache_keys.rs new file mode 100644 index 0000000000..23cf22e2c0 --- /dev/null +++ b/src/common/meta/src/ddl/alter_logical_tables/table_cache_keys.rs @@ -0,0 +1,51 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use table::metadata::RawTableInfo; + +use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; +use crate::instruction::CacheIdent; +use crate::table_name::TableName; + +impl AlterLogicalTablesProcedure { + pub(crate) fn build_table_cache_keys_to_invalidate(&self) -> Vec { + let mut cache_keys = self + .data + .table_info_values + .iter() + .flat_map(|table| { + vec![ + CacheIdent::TableId(table.table_info.ident.table_id), + CacheIdent::TableName(extract_table_name(&table.table_info)), + ] + }) + .collect::>(); + cache_keys.push(CacheIdent::TableId(self.data.physical_table_id)); + // Safety: physical_table_info already filled in previous steps + let physical_table_info = &self.data.physical_table_info.as_ref().unwrap().table_info; + cache_keys.push(CacheIdent::TableName(extract_table_name( + physical_table_info, + ))); + + cache_keys + } +} + +fn extract_table_name(table_info: &RawTableInfo) -> TableName { + TableName::new( + &table_info.catalog_name, + &table_info.schema_name, + &table_info.name, + ) +} diff --git a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs index e9ba0e7222..b31e0a8799 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs @@ -23,10 +23,14 @@ use crate::key::table_info::TableInfoValue; use crate::rpc::ddl::AlterTableTask; impl AlterLogicalTablesProcedure { - pub(crate) fn build_update_metadata(&mut self) -> Result> { + pub(crate) fn build_update_metadata(&self) -> Result> { let mut table_info_values_to_update = Vec::with_capacity(self.data.tasks.len()); - let table_info_values = std::mem::take(&mut self.data.table_info_values); - for (task, table) in self.data.tasks.iter().zip(table_info_values.into_iter()) { + for (task, table) in self + .data + .tasks + .iter() + .zip(self.data.table_info_values.iter()) + { table_info_values_to_update.push(self.build_new_table_info(task, table)?); } @@ -36,7 +40,7 @@ impl AlterLogicalTablesProcedure { fn build_new_table_info( &self, task: &AlterTableTask, - table: TableInfoValue, + table: &TableInfoValue, ) -> Result<(TableInfoValue, RawTableInfo)> { // Builds new_meta let table_info = TableInfo::try_from(table.table_info.clone()) @@ -61,6 +65,6 @@ impl AlterLogicalTablesProcedure { let mut raw_table_info = RawTableInfo::from(new_table); raw_table_info.sort_columns(); - Ok((table, raw_table_info)) + Ok((table.clone(), raw_table_info)) } } diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 80cba554c3..df64b8e286 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::ops::Deref; use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader}; -use api::v1::{CreateTableExpr, SemanticType}; +use api::v1::CreateTableExpr; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; @@ -36,7 +36,7 @@ use table::metadata::{RawTableInfo, TableId}; use crate::cache_invalidator::Context; use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path}; -use crate::ddl::DdlContext; +use crate::ddl::{physical_table_metadata, DdlContext}; use crate::error::{ DecodeJsonSnafu, MetadataCorruptionSnafu, Result, TableAlreadyExistsSnafu, TableInfoNotFoundSnafu, @@ -50,6 +50,7 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; +use crate::table_name::TableName; use crate::{metrics, ClusterId}; pub struct CreateLogicalTablesProcedure { @@ -226,11 +227,11 @@ impl CreateLogicalTablesProcedure { let physical_table_info = self .context .table_metadata_manager - .get_full_table_info(self.data.physical_table_id) + .table_info_manager() + .get(self.data.physical_table_id) .await? - .0 - .context(TableInfoNotFoundSnafu { - table_name: format!("table id - {}", self.data.physical_table_id), + .with_context(|| TableInfoNotFoundSnafu { + table: format!("table id - {}", self.data.physical_table_id), })?; // generate new table info @@ -238,6 +239,12 @@ impl CreateLogicalTablesProcedure { .data .build_new_physical_table_info(&physical_table_info); + let physical_table_name = TableName::new( + &new_table_info.catalog_name, + &new_table_info.schema_name, + &new_table_info.name, + ); + // update physical table's metadata self.context .table_metadata_manager @@ -249,7 +256,10 @@ impl CreateLogicalTablesProcedure { .cache_invalidator .invalidate( &Context::default(), - vec![CacheIdent::TableId(self.data.physical_table_id)], + vec![ + CacheIdent::TableId(self.data.physical_table_id), + CacheIdent::TableName(physical_table_name), + ], ) .await?; } else { @@ -358,8 +368,7 @@ impl CreateLogicalTablesProcedure { self.data.state = CreateTablesState::CreateMetadata; - // Ensures the procedures after the crash start from the `DatanodeCreateRegions` stage. - Ok(Status::executing(false)) + Ok(Status::executing(true)) } } @@ -479,38 +488,12 @@ impl CreateTablesData { &mut self, old_table_info: &DeserializedValueWithBytes, ) -> RawTableInfo { - let mut raw_table_info = old_table_info.deref().table_info.clone(); + let raw_table_info = old_table_info.deref().table_info.clone(); - let existing_primary_key = raw_table_info - .meta - .schema - .column_schemas - .iter() - .map(|col| col.name.clone()) - .collect::>(); - let primary_key_indices = &mut raw_table_info.meta.primary_key_indices; - let value_indices = &mut raw_table_info.meta.value_indices; - value_indices.clear(); - let time_index = &mut raw_table_info.meta.schema.timestamp_index; - let columns = &mut raw_table_info.meta.schema.column_schemas; - columns.clear(); - - for (idx, col) in self.physical_columns.drain(..).enumerate() { - match col.semantic_type { - SemanticType::Tag => { - // push new primary key to the end. - if !existing_primary_key.contains(&col.column_schema.name) { - primary_key_indices.push(idx); - } - } - SemanticType::Field => value_indices.push(idx), - SemanticType::Timestamp => *time_index = Some(idx), - } - - columns.push(col.column_schema); - } - - raw_table_info + physical_table_metadata::build_new_physical_table_info( + raw_table_info, + &self.physical_columns, + ) } } diff --git a/src/common/meta/src/ddl/physical_table_metadata.rs b/src/common/meta/src/ddl/physical_table_metadata.rs new file mode 100644 index 0000000000..df66995bd8 --- /dev/null +++ b/src/common/meta/src/ddl/physical_table_metadata.rs @@ -0,0 +1,56 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use api::v1::SemanticType; +use store_api::metadata::ColumnMetadata; +use table::metadata::RawTableInfo; + +/// Generate the new physical table info. +pub(crate) fn build_new_physical_table_info( + mut raw_table_info: RawTableInfo, + physical_columns: &[ColumnMetadata], +) -> RawTableInfo { + let existing_columns = raw_table_info + .meta + .schema + .column_schemas + .iter() + .map(|col| col.name.clone()) + .collect::>(); + let primary_key_indices = &mut raw_table_info.meta.primary_key_indices; + let value_indices = &mut raw_table_info.meta.value_indices; + value_indices.clear(); + let time_index = &mut raw_table_info.meta.schema.timestamp_index; + let columns = &mut raw_table_info.meta.schema.column_schemas; + columns.clear(); + + for (idx, col) in physical_columns.iter().enumerate() { + match col.semantic_type { + SemanticType::Tag => { + // push new primary key to the end. + if !existing_columns.contains(&col.column_schema.name) { + primary_key_indices.push(idx); + } + } + SemanticType::Field => value_indices.push(idx), + SemanticType::Timestamp => *time_index = Some(idx), + } + + columns.push(col.column_schema.clone()); + } + + raw_table_info +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 2ee5ebc49e..059094d819 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -369,7 +369,7 @@ async fn handle_truncate_table_task( table_metadata_manager.get_full_table_info(table_id).await?; let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu { - table_name: table_ref.to_string(), + table: table_ref.to_string(), })?; let table_route_value = @@ -421,7 +421,7 @@ async fn handle_alter_table_task( .get(table_id) .await? .with_context(|| error::TableInfoNotFoundSnafu { - table_name: table_ref.to_string(), + table: table_ref.to_string(), })?; let physical_table_id = ddl_manager @@ -439,7 +439,7 @@ async fn handle_alter_table_task( .get(physical_table_id) .await? .with_context(|| error::TableInfoNotFoundSnafu { - table_name: table_ref.to_string(), + table: table_ref.to_string(), })? .table_info; Some(( @@ -488,7 +488,7 @@ async fn handle_drop_table_task( .await?; let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu { - table_name: table_ref.to_string(), + table: table_ref.to_string(), })?; let table_route_value = diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index bc85bfe262..ff067aa609 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -89,11 +89,8 @@ pub enum Error { #[snafu(display("Unexpected sequence value: {}", err_msg))] UnexpectedSequenceValue { err_msg: String, location: Location }, - #[snafu(display("Table info not found: {}", table_name))] - TableInfoNotFound { - table_name: String, - location: Location, - }, + #[snafu(display("Table info not found: {}", table))] + TableInfoNotFound { table: String, location: Location }, #[snafu(display("Failed to register procedure loader, type name: {}", type_name))] RegisterProcedureLoader { diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index bb99c2c5ee..1464a11c27 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use catalog::kvbackend::KvBackendCatalogManager; +use catalog::CatalogManagerRef; use common_base::Plugins; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; use common_meta::datanode_manager::DatanodeManagerRef; @@ -41,6 +41,7 @@ use crate::script::ScriptExecutor; pub struct FrontendBuilder { kv_backend: KvBackendRef, cache_invalidator: Option, + catalog_manager: CatalogManagerRef, datanode_manager: DatanodeManagerRef, plugins: Option, procedure_executor: ProcedureExecutorRef, @@ -50,12 +51,14 @@ pub struct FrontendBuilder { impl FrontendBuilder { pub fn new( kv_backend: KvBackendRef, + catalog_manager: CatalogManagerRef, datanode_manager: DatanodeManagerRef, procedure_executor: ProcedureExecutorRef, ) -> Self { Self { kv_backend, cache_invalidator: None, + catalog_manager, datanode_manager, plugins: None, procedure_executor, @@ -89,29 +92,27 @@ impl FrontendBuilder { let datanode_manager = self.datanode_manager; let plugins = self.plugins.unwrap_or_default(); - let catalog_manager = KvBackendCatalogManager::new( - kv_backend.clone(), - self.cache_invalidator - .unwrap_or_else(|| Arc::new(DummyCacheInvalidator)), - ); - let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone())); + let cache_invalidator = self + .cache_invalidator + .unwrap_or_else(|| Arc::new(DummyCacheInvalidator)); + let region_query_handler = FrontendRegionQueryHandler::arc(partition_manager.clone(), datanode_manager.clone()); let inserter = Arc::new(Inserter::new( - catalog_manager.clone(), + self.catalog_manager.clone(), partition_manager.clone(), datanode_manager.clone(), )); let deleter = Arc::new(Deleter::new( - catalog_manager.clone(), + self.catalog_manager.clone(), partition_manager.clone(), datanode_manager.clone(), )); let requester = Arc::new(Requester::new( - catalog_manager.clone(), + self.catalog_manager.clone(), partition_manager, datanode_manager.clone(), )); @@ -126,7 +127,7 @@ impl FrontendBuilder { )); let query_engine = QueryEngineFactory::new_with_plugins( - catalog_manager.clone(), + self.catalog_manager.clone(), Some(region_query_handler.clone()), Some(table_mutation_handler), Some(procedure_service_handler), @@ -135,22 +136,23 @@ impl FrontendBuilder { ) .query_engine(); - let script_executor = - Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); + let script_executor = Arc::new( + ScriptExecutor::new(self.catalog_manager.clone(), query_engine.clone()).await?, + ); let statement_executor = Arc::new(StatementExecutor::new( - catalog_manager.clone(), + self.catalog_manager.clone(), query_engine.clone(), self.procedure_executor, kv_backend.clone(), - catalog_manager.clone(), + cache_invalidator, inserter.clone(), )); plugins.insert::(statement_executor.clone()); Ok(Instance { - catalog_manager, + catalog_manager: self.catalog_manager, script_executor, statement_executor, query_engine, diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 3caf1b63dd..d40ca633ff 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -303,7 +303,7 @@ async fn test_on_datanode_create_logical_regions() { }); let status = procedure.on_datanode_create_regions().await.unwrap(); - assert!(matches!(status, Status::Executing { persist: false })); + assert!(matches!(status, Status::Executing { persist: true })); assert!(matches!( procedure.data.state(), &CreateTablesState::CreateMetadata diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index e16aa82073..49f1ef6895 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -487,26 +487,26 @@ impl StatementExecutor { #[tracing::instrument(skip_all)] pub async fn alter_table_inner(&self, expr: AlterExpr) -> Result { let catalog_name = if expr.catalog_name.is_empty() { - DEFAULT_CATALOG_NAME + DEFAULT_CATALOG_NAME.to_string() } else { - expr.catalog_name.as_str() + expr.catalog_name.clone() }; let schema_name = if expr.schema_name.is_empty() { - DEFAULT_SCHEMA_NAME + DEFAULT_SCHEMA_NAME.to_string() } else { - expr.schema_name.as_str() + expr.schema_name.clone() }; - let table_name = expr.table_name.as_str(); + let table_name = expr.table_name.clone(); let table = self .catalog_manager - .table(catalog_name, schema_name, table_name) + .table(&catalog_name, &schema_name, &table_name) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { - table_name: format_full_table_name(catalog_name, schema_name, table_name), + table_name: format_full_table_name(&catalog_name, &schema_name, &table_name), })?; let table_id = table.table_info().ident.table_id; @@ -518,8 +518,54 @@ impl StatementExecutor { expr ); - let req = SubmitDdlTaskRequest { - task: DdlTask::new_alter_table(expr.clone()), + let physical_table_id = self + .table_metadata_manager + .table_route_manager() + .get_physical_table_id(table_id) + .await + .context(TableMetadataManagerSnafu)?; + + let (req, invalidate_keys) = if physical_table_id == table_id { + // This is physical table + let req = SubmitDdlTaskRequest { + task: DdlTask::new_alter_table(expr), + }; + + let invalidate_keys = vec![ + CacheIdent::TableId(table_id), + CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)), + ]; + + (req, invalidate_keys) + } else { + // This is logical table + let req = SubmitDdlTaskRequest { + task: DdlTask::new_alter_logical_tables(vec![expr]), + }; + + let mut invalidate_keys = vec![ + CacheIdent::TableId(physical_table_id), + CacheIdent::TableId(table_id), + CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)), + ]; + + let physical_table = self + .table_metadata_manager + .table_info_manager() + .get(physical_table_id) + .await + .context(TableMetadataManagerSnafu)? + .map(|x| x.into_inner()); + if let Some(physical_table) = physical_table { + let physical_table_name = TableName::new( + physical_table.table_info.catalog_name, + physical_table.table_info.schema_name, + physical_table.table_info.name, + ); + invalidate_keys.push(CacheIdent::TableName(physical_table_name)); + } + + (req, invalidate_keys) }; self.procedure_executor @@ -529,13 +575,7 @@ impl StatementExecutor { // Invalidates local cache ASAP. self.cache_invalidator - .invalidate( - &Context::default(), - vec![ - CacheIdent::TableId(table_id), - CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)), - ], - ) + .invalidate(&Context::default(), invalidate_keys) .await .context(error::InvalidateTableCacheSnafu)?; diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 197d3dead1..d427b2a8f2 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -20,7 +20,7 @@ use std::time::Duration; use api::v1::meta::Role; use api::v1::region::region_server::RegionServer; use arrow_flight::flight_service_server::FlightServiceServer; -use catalog::kvbackend::{CachedMetaKvBackendBuilder, MetaKvBackend}; +use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; @@ -353,11 +353,12 @@ impl GreptimeDbClusterBuilder { let cached_meta_backend = Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build()); + let catalog_manager = + KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend.clone()); + let handlers_executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new( - cached_meta_backend.clone(), - )), + Arc::new(InvalidateTableCacheHandler::new(catalog_manager.clone())), ]); let heartbeat_task = HeartbeatTask::new( @@ -366,13 +367,17 @@ impl GreptimeDbClusterBuilder { Arc::new(handlers_executor), ); - let instance = - FrontendBuilder::new(cached_meta_backend.clone(), datanode_clients, meta_client) - .with_cache_invalidator(cached_meta_backend) - .with_heartbeat_task(heartbeat_task) - .try_build() - .await - .unwrap(); + let instance = FrontendBuilder::new( + cached_meta_backend.clone(), + catalog_manager.clone(), + datanode_clients, + meta_client, + ) + .with_cache_invalidator(catalog_manager) + .with_heartbeat_task(heartbeat_task) + .try_build() + .await + .unwrap(); Arc::new(instance) } diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index b0749c3a23..0c9a58284c 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use catalog::kvbackend::KvBackendCatalogManager; use cmd::options::MixOptions; use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; @@ -124,6 +125,9 @@ impl GreptimeDbStandaloneBuilder { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_metadata_manager.init().await.unwrap(); + let catalog_manager = + KvBackendCatalogManager::new(kv_backend.clone(), Arc::new(DummyCacheInvalidator)); + let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); let table_id_sequence = Arc::new( @@ -154,12 +158,17 @@ impl GreptimeDbStandaloneBuilder { .unwrap(), ); - let instance = - FrontendBuilder::new(kv_backend.clone(), datanode_manager, ddl_task_executor) - .with_plugin(plugins) - .try_build() - .await - .unwrap(); + let instance = FrontendBuilder::new( + kv_backend.clone(), + catalog_manager.clone(), + datanode_manager, + ddl_task_executor, + ) + .with_plugin(plugins) + .with_cache_invalidator(catalog_manager) + .try_build() + .await + .unwrap(); procedure_manager.start().await.unwrap(); wal_options_allocator.start().await.unwrap(); diff --git a/tests/cases/standalone/common/alter/alter_metric_table.result b/tests/cases/standalone/common/alter/alter_metric_table.result new file mode 100644 index 0000000000..8ae541b71e --- /dev/null +++ b/tests/cases/standalone/common/alter/alter_metric_table.result @@ -0,0 +1,110 @@ +CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = ""); + +Affected Rows: 0 + +SHOW TABLES; + ++---------+ +| Tables | ++---------+ +| numbers | +| phy | ++---------+ + +CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +DESC TABLE t1; + ++--------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+----------------------+-----+------+---------+---------------+ +| host | String | PRI | YES | | TAG | +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| val | Float64 | | YES | | FIELD | ++--------+----------------------+-----+------+---------+---------------+ + +DESC TABLE t2; + ++--------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+----------------------+-----+------+---------+---------------+ +| job | String | PRI | YES | | TAG | +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| val | Float64 | | YES | | FIELD | ++--------+----------------------+-----+------+---------+---------------+ + +DESC TABLE phy; + ++------------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++------------+----------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | | NO | | FIELD | +| val | Float64 | | YES | | FIELD | +| __table_id | UInt32 | PRI | NO | | TAG | +| __tsid | UInt64 | PRI | NO | | TAG | +| host | String | PRI | YES | | TAG | +| job | String | PRI | YES | | TAG | ++------------+----------------------+-----+------+---------+---------------+ + +ALTER TABLE t1 ADD COLUMN k STRING PRIMARY KEY; + +Affected Rows: 0 + +ALTER TABLE t2 ADD COLUMN k STRING PRIMARY KEY; + +Affected Rows: 0 + +DESC TABLE t1; + ++--------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+----------------------+-----+------+---------+---------------+ +| host | String | PRI | YES | | TAG | +| k | String | PRI | YES | | TAG | +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| val | Float64 | | YES | | FIELD | ++--------+----------------------+-----+------+---------+---------------+ + +DESC TABLE t2; + ++--------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+----------------------+-----+------+---------+---------------+ +| job | String | PRI | YES | | TAG | +| k | String | PRI | YES | | TAG | +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| val | Float64 | | YES | | FIELD | ++--------+----------------------+-----+------+---------+---------------+ + +DESC TABLE phy; + ++------------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++------------+----------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | | NO | | FIELD | +| val | Float64 | | YES | | FIELD | +| __table_id | UInt32 | PRI | NO | | TAG | +| __tsid | UInt64 | PRI | NO | | TAG | +| host | String | PRI | YES | | TAG | +| job | String | PRI | YES | | TAG | +| k | String | PRI | YES | | TAG | ++------------+----------------------+-----+------+---------+---------------+ + +DROP TABLE t1; + +Affected Rows: 0 + +DROP TABLE t2; + +Affected Rows: 0 + +DROP TABLE phy; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/alter_metric_table.sql b/tests/cases/standalone/common/alter/alter_metric_table.sql new file mode 100644 index 0000000000..579dd90c48 --- /dev/null +++ b/tests/cases/standalone/common/alter/alter_metric_table.sql @@ -0,0 +1,29 @@ +CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = ""); + +SHOW TABLES; + +CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy"); + +CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy"); + +DESC TABLE t1; + +DESC TABLE t2; + +DESC TABLE phy; + +ALTER TABLE t1 ADD COLUMN k STRING PRIMARY KEY; + +ALTER TABLE t2 ADD COLUMN k STRING PRIMARY KEY; + +DESC TABLE t1; + +DESC TABLE t2; + +DESC TABLE phy; + +DROP TABLE t1; + +DROP TABLE t2; + +DROP TABLE phy; diff --git a/tests/cases/standalone/common/create/create_metric_table.result b/tests/cases/standalone/common/create/create_metric_table.result index b6578c5ca9..0a153ec733 100644 --- a/tests/cases/standalone/common/create/create_metric_table.result +++ b/tests/cases/standalone/common/create/create_metric_table.result @@ -11,6 +11,15 @@ SHOW TABLES; | phy | +---------+ +DESC TABLE phy; + ++--------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+----------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| val | Float64 | | YES | | FIELD | ++--------+----------------------+-----+------+---------+---------------+ + CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy"); Affected Rows: 0 diff --git a/tests/cases/standalone/common/create/create_metric_table.sql b/tests/cases/standalone/common/create/create_metric_table.sql index 28b3083d90..fcc41ff115 100644 --- a/tests/cases/standalone/common/create/create_metric_table.sql +++ b/tests/cases/standalone/common/create/create_metric_table.sql @@ -2,6 +2,8 @@ CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("phys SHOW TABLES; +DESC TABLE phy; + CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy"); CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");