diff --git a/src/cli/src/bench.rs b/src/cli/src/bench.rs index 2bd5d709b2..d08bb06948 100644 --- a/src/cli/src/bench.rs +++ b/src/cli/src/bench.rs @@ -160,6 +160,7 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo { options: Default::default(), region_numbers: (1..=100).collect(), partition_key_indices: vec![], + column_ids: vec![], }; RawTableInfo { diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index 68bbae6c41..61f4196b54 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -27,17 +27,18 @@ use common_telemetry::{error, info, warn}; use futures_util::future; pub use region_request::make_alter_region_request; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::ResultExt; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use strum::AsRefStr; use table::metadata::TableId; use crate::ddl::utils::{ - add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions, + add_peer_context_if_needed, extract_column_metadatas, map_to_procedure_error, + sync_follower_regions, }; use crate::ddl::DdlContext; -use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result}; +use crate::error::Result; use crate::instruction::CacheIdent; use crate::key::table_info::TableInfoValue; use crate::key::table_route::PhysicalTableRouteValue; @@ -137,37 +138,13 @@ impl AlterLogicalTablesProcedure { .into_iter() .collect::>>()?; - // Collects responses from datanodes. - let phy_raw_schemas = results - .iter_mut() - .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)) - .collect::>(); - - if phy_raw_schemas.is_empty() { - self.submit_sync_region_requests(results, &physical_table_route.region_routes) - .await; - self.data.state = AlterTablesState::UpdateMetadata; - return Ok(Status::executing(true)); - } - - // Verify all the physical schemas are the same - // Safety: previous check ensures this vec is not empty - let first = phy_raw_schemas.first().unwrap(); - ensure!( - phy_raw_schemas.iter().all(|x| x == first), - MetadataCorruptionSnafu { - err_msg: "The physical schemas from datanodes are not the same." - } - ); - - // Decodes the physical raw schemas - if let Some(phy_raw_schema) = first { - self.data.physical_columns = - ColumnMetadata::decode_list(phy_raw_schema).context(DecodeJsonSnafu)?; + if let Some(column_metadatas) = + extract_column_metadatas(&mut results, ALTER_PHYSICAL_EXTENSION_KEY)? + { + self.data.physical_columns = column_metadatas; } else { warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged"); } - self.submit_sync_region_requests(results, &physical_table_route.region_routes) .await; self.data.state = AlterTablesState::UpdateMetadata; @@ -183,7 +160,7 @@ impl AlterLogicalTablesProcedure { if let Err(err) = sync_follower_regions( &self.context, self.data.physical_table_id, - results, + &results, region_routes, table_info.meta.engine.as_str(), ) diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index bfa0a679ea..89081a4b58 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -29,19 +29,22 @@ use common_procedure::{ Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey, PoisonKeys, Procedure, ProcedureId, Status, StringKey, }; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, error, info, warn}; use futures::future::{self}; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; +use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY; use store_api::storage::RegionId; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId, TableInfo}; use table::table_reference::TableReference; use crate::cache_invalidator::Context; +use crate::ddl::physical_table_metadata::update_table_info_column_ids; use crate::ddl::utils::{ - add_peer_context_if_needed, handle_multiple_results, map_to_procedure_error, - sync_follower_regions, MultipleResults, + add_peer_context_if_needed, extract_column_metadatas, handle_multiple_results, + map_to_procedure_error, sync_follower_regions, MultipleResults, }; use crate::ddl::DdlContext; use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu}; @@ -202,9 +205,9 @@ impl AlterTableProcedure { }) } MultipleResults::Ok(results) => { - self.submit_sync_region_requests(results, &physical_table_route.region_routes) + self.submit_sync_region_requests(&results, &physical_table_route.region_routes) .await; - self.data.state = AlterTableState::UpdateMetadata; + self.handle_alter_region_response(results)?; Ok(Status::executing_with_clean_poisons(true)) } MultipleResults::AllNonRetryable(error) => { @@ -220,9 +223,22 @@ impl AlterTableProcedure { } } + fn handle_alter_region_response(&mut self, mut results: Vec) -> Result<()> { + self.data.state = AlterTableState::UpdateMetadata; + if let Some(column_metadatas) = + extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)? + { + self.data.column_metadatas = column_metadatas; + } else { + warn!("altering table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"); + } + + Ok(()) + } + async fn submit_sync_region_requests( &mut self, - results: Vec, + results: &[RegionResponse], region_routes: &[RegionRoute], ) { // Safety: filled in `prepare` step. @@ -268,10 +284,14 @@ impl AlterTableProcedure { self.on_update_metadata_for_rename(new_table_name.to_string(), table_info_value) .await?; } else { + let mut raw_table_info = new_info.into(); + if !self.data.column_metadatas.is_empty() { + update_table_info_column_ids(&mut raw_table_info, &self.data.column_metadatas); + } // region distribution is set in submit_alter_region_requests let region_distribution = self.data.region_distribution.as_ref().unwrap().clone(); self.on_update_metadata_for_alter( - new_info.into(), + raw_table_info, region_distribution, table_info_value, ) @@ -318,6 +338,16 @@ impl AlterTableProcedure { lock_key } + + #[cfg(test)] + pub(crate) fn data(&self) -> &AlterTableData { + &self.data + } + + #[cfg(test)] + pub(crate) fn mut_data(&mut self) -> &mut AlterTableData { + &mut self.data + } } #[async_trait] @@ -380,6 +410,8 @@ pub struct AlterTableData { state: AlterTableState, task: AlterTableTask, table_id: TableId, + #[serde(default)] + column_metadatas: Vec, /// Table info value before alteration. table_info_value: Option>, /// Region distribution for table in case we need to update region options. @@ -392,6 +424,7 @@ impl AlterTableData { state: AlterTableState::Prepare, task, table_id, + column_metadatas: vec![], table_info_value: None, region_distribution: None, } @@ -410,4 +443,14 @@ impl AlterTableData { .as_ref() .map(|value| &value.table_info) } + + #[cfg(test)] + pub(crate) fn column_metadatas(&self) -> &[ColumnMetadata] { + &self.column_metadatas + } + + #[cfg(test)] + pub(crate) fn set_column_metadatas(&mut self, column_metadatas: Vec) { + self.column_metadatas = column_metadatas; + } } diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index d1f1ac37b6..3b0a091a14 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -27,7 +27,7 @@ use common_telemetry::{debug, error, warn}; use futures::future; pub use region_request::create_region_request_builder; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::ResultExt; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use store_api::storage::{RegionId, RegionNumber}; @@ -35,10 +35,11 @@ use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId}; use crate::ddl::utils::{ - add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions, + add_peer_context_if_needed, extract_column_metadatas, map_to_procedure_error, + sync_follower_regions, }; use crate::ddl::DdlContext; -use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result}; +use crate::error::Result; use crate::key::table_route::TableRouteValue; use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; use crate::metrics; @@ -166,47 +167,23 @@ impl CreateLogicalTablesProcedure { .into_iter() .collect::>>()?; - // Collects response from datanodes. - let phy_raw_schemas = results - .iter_mut() - .map(|res| res.extensions.remove(ALTER_PHYSICAL_EXTENSION_KEY)) - .collect::>(); - - if phy_raw_schemas.is_empty() { - self.submit_sync_region_requests(results, region_routes) - .await; - self.data.state = CreateTablesState::CreateMetadata; - return Ok(Status::executing(false)); - } - - // Verify all the physical schemas are the same - // Safety: previous check ensures this vec is not empty - let first = phy_raw_schemas.first().unwrap(); - ensure!( - phy_raw_schemas.iter().all(|x| x == first), - MetadataCorruptionSnafu { - err_msg: "The physical schemas from datanodes are not the same." - } - ); - - // Decodes the physical raw schemas - if let Some(phy_raw_schemas) = first { - self.data.physical_columns = - ColumnMetadata::decode_list(phy_raw_schemas).context(DecodeJsonSnafu)?; + if let Some(column_metadatas) = + extract_column_metadatas(&mut results, ALTER_PHYSICAL_EXTENSION_KEY)? + { + self.data.physical_columns = column_metadatas; } else { warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged"); } - self.submit_sync_region_requests(results, region_routes) + self.submit_sync_region_requests(&results, region_routes) .await; self.data.state = CreateTablesState::CreateMetadata; - Ok(Status::executing(true)) } async fn submit_sync_region_requests( &self, - results: Vec, + results: &[RegionResponse], region_routes: &[RegionRoute], ) { if let Err(err) = sync_follower_regions( diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 1a62c8e716..8781bbbf19 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -22,20 +22,23 @@ use common_procedure::error::{ ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; -use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; +use common_telemetry::{info, warn}; use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY; use store_api::storage::{RegionId, RegionNumber}; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; +use crate::ddl::physical_table_metadata::update_table_info_column_ids; use crate::ddl::utils::{ - add_peer_context_if_needed, convert_region_routes_to_detecting_regions, map_to_procedure_error, - region_storage_path, + add_peer_context_if_needed, convert_region_routes_to_detecting_regions, + extract_column_metadatas, map_to_procedure_error, region_storage_path, }; use crate::ddl::{DdlContext, TableMetadata}; use crate::error::{self, Result}; @@ -243,14 +246,21 @@ impl CreateTableProcedure { } } - join_all(create_region_tasks) + self.creator.data.state = CreateTableState::CreateMetadata; + + let mut results = join_all(create_region_tasks) .await .into_iter() .collect::>>()?; - self.creator.data.state = CreateTableState::CreateMetadata; + if let Some(column_metadatas) = + extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)? + { + self.creator.data.column_metadatas = column_metadatas; + } else { + warn!("creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"); + } - // TODO(weny): Add more tests. Ok(Status::executing(true)) } @@ -262,7 +272,10 @@ impl CreateTableProcedure { let table_id = self.table_id(); let manager = &self.context.table_metadata_manager; - let raw_table_info = self.table_info().clone(); + let mut raw_table_info = self.table_info().clone(); + if !self.creator.data.column_metadatas.is_empty() { + update_table_info_column_ids(&mut raw_table_info, &self.creator.data.column_metadatas); + } // Safety: the region_wal_options must be allocated. let region_wal_options = self.region_wal_options()?.clone(); // Safety: the table_route must be allocated. @@ -346,6 +359,7 @@ impl TableCreator { Self { data: CreateTableData { state: CreateTableState::Prepare, + column_metadatas: vec![], task, table_route: None, region_wal_options: None, @@ -407,6 +421,8 @@ pub enum CreateTableState { pub struct CreateTableData { pub state: CreateTableState, pub task: CreateTableTask, + #[serde(default)] + pub column_metadatas: Vec, /// None stands for not allocated yet. table_route: Option, /// None stands for not allocated yet. diff --git a/src/common/meta/src/ddl/physical_table_metadata.rs b/src/common/meta/src/ddl/physical_table_metadata.rs index 376a143133..37c7446e2f 100644 --- a/src/common/meta/src/ddl/physical_table_metadata.rs +++ b/src/common/meta/src/ddl/physical_table_metadata.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use api::v1::SemanticType; +use common_telemetry::debug; +use common_telemetry::tracing::warn; use store_api::metadata::ColumnMetadata; use table::metadata::RawTableInfo; @@ -23,6 +25,10 @@ pub(crate) fn build_new_physical_table_info( mut raw_table_info: RawTableInfo, physical_columns: &[ColumnMetadata], ) -> RawTableInfo { + debug!( + "building new physical table info for table: {}, table_id: {}", + raw_table_info.name, raw_table_info.ident.table_id + ); let existing_columns = raw_table_info .meta .schema @@ -36,6 +42,8 @@ pub(crate) fn build_new_physical_table_info( let time_index = &mut raw_table_info.meta.schema.timestamp_index; let columns = &mut raw_table_info.meta.schema.column_schemas; columns.clear(); + let column_ids = &mut raw_table_info.meta.column_ids; + column_ids.clear(); for (idx, col) in physical_columns.iter().enumerate() { match col.semantic_type { @@ -50,6 +58,7 @@ pub(crate) fn build_new_physical_table_info( } columns.push(col.column_schema.clone()); + column_ids.push(col.column_id); } if let Some(time_index) = *time_index { @@ -58,3 +67,54 @@ pub(crate) fn build_new_physical_table_info( raw_table_info } + +/// Updates the column IDs in the table info based on the provided column metadata. +/// +/// This function validates that the column metadata matches the existing table schema +/// before updating the column ids. If the column metadata doesn't match the table schema, +/// the table info remains unchanged. +pub(crate) fn update_table_info_column_ids( + raw_table_info: &mut RawTableInfo, + column_metadatas: &[ColumnMetadata], +) { + let mut table_column_names = raw_table_info + .meta + .schema + .column_schemas + .iter() + .map(|c| c.name.as_str()) + .collect::>(); + table_column_names.sort_unstable(); + + let mut column_names = column_metadatas + .iter() + .map(|c| c.column_schema.name.as_str()) + .collect::>(); + column_names.sort_unstable(); + + if table_column_names != column_names { + warn!( + "Column metadata doesn't match the table schema for table {}, table_id: {}, column in table: {:?}, column in metadata: {:?}", + raw_table_info.name, + raw_table_info.ident.table_id, + table_column_names, + column_names, + ); + return; + } + + let name_to_id = column_metadatas + .iter() + .map(|c| (c.column_schema.name.clone(), c.column_id)) + .collect::>(); + + let schema = &raw_table_info.meta.schema.column_schemas; + let mut column_ids = Vec::with_capacity(schema.len()); + for column_schema in schema { + if let Some(id) = name_to_id.get(&column_schema.name) { + column_ids.push(*id); + } + } + + raw_table_info.meta.column_ids = column_ids; +} diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index b2755df2c1..f07e2c2f6f 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -122,6 +122,7 @@ impl TableMetadataAllocator { ); let peers = self.peer_allocator.alloc(regions).await?; + debug!("Allocated peers {:?} for table {}", peers, table_id); let region_routes = task .partitions .iter() diff --git a/src/common/meta/src/ddl/test_util.rs b/src/common/meta/src/ddl/test_util.rs index c440e517d5..24fa5522fa 100644 --- a/src/common/meta/src/ddl/test_util.rs +++ b/src/common/meta/src/ddl/test_util.rs @@ -24,7 +24,14 @@ use std::collections::HashMap; use api::v1::meta::Partition; use api::v1::{ColumnDataType, SemanticType}; use common_procedure::Status; -use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME}; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::{ + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, + METRIC_ENGINE_NAME, +}; +use store_api::storage::consts::ReservedColumnId; use table::metadata::{RawTableInfo, TableId}; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; @@ -146,6 +153,7 @@ pub fn test_create_logical_table_task(name: &str) -> CreateTableTask { } } +/// Creates a physical table task with a single region. pub fn test_create_physical_table_task(name: &str) -> CreateTableTask { let create_table = TestCreateTableExprBuilder::default() .column_defs([ @@ -182,3 +190,95 @@ pub fn test_create_physical_table_task(name: &str) -> CreateTableTask { table_info, } } + +/// Creates a column metadata list with tag fields. +pub fn test_column_metadatas(tag_fields: &[&str]) -> Vec { + let mut output = Vec::with_capacity(tag_fields.len() + 4); + output.extend([ + ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 0, + }, + ColumnMetadata { + column_schema: ColumnSchema::new("value", ConcreteDataType::float64_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 1, + }, + ColumnMetadata { + column_schema: ColumnSchema::new( + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: ReservedColumnId::table_id(), + }, + ColumnMetadata { + column_schema: ColumnSchema::new( + DATA_SCHEMA_TSID_COLUMN_NAME, + ConcreteDataType::float64_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: ReservedColumnId::tsid(), + }, + ]); + + for (i, name) in tag_fields.iter().enumerate() { + output.push(ColumnMetadata { + column_schema: ColumnSchema::new( + name.to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: (i + 2) as u32, + }); + } + + output +} + +/// Asserts the column names. +pub fn assert_column_name(table_info: &RawTableInfo, expected_column_names: &[&str]) { + assert_eq!( + table_info + .meta + .schema + .column_schemas + .iter() + .map(|c| c.name.to_string()) + .collect::>(), + expected_column_names + ); +} + +/// Asserts the column metadatas +pub fn assert_column_name_and_id(column_metadatas: &[ColumnMetadata], expected: &[(&str, u32)]) { + assert_eq!(expected.len(), column_metadatas.len()); + for (name, id) in expected { + let column_metadata = column_metadatas + .iter() + .find(|c| c.column_id == *id) + .unwrap(); + assert_eq!(column_metadata.column_schema.name, *name); + } +} + +/// Gets the raw table info. +pub async fn get_raw_table_info(ddl_context: &DdlContext, table_id: TableId) -> RawTableInfo { + ddl_context + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .unwrap() + .unwrap() + .into_inner() + .table_info +} diff --git a/src/common/meta/src/ddl/test_util/create_table.rs b/src/common/meta/src/ddl/test_util/create_table.rs index 9d99bbf5c6..b0b7b3a5c9 100644 --- a/src/common/meta/src/ddl/test_util/create_table.rs +++ b/src/common/meta/src/ddl/test_util/create_table.rs @@ -132,6 +132,7 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo { options: TableOptions::try_from_iter(&expr.table_options).unwrap(), created_on: DateTime::default(), partition_key_indices: vec![], + column_ids: vec![], }, table_type: TableType::Base, } diff --git a/src/common/meta/src/ddl/test_util/datanode_handler.rs b/src/common/meta/src/ddl/test_util/datanode_handler.rs index 775fc644f7..4688cfa987 100644 --- a/src/common/meta/src/ddl/test_util/datanode_handler.rs +++ b/src/common/meta/src/ddl/test_util/datanode_handler.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use api::region::RegionResponse; use api::v1::region::RegionRequest; use common_error::ext::{BoxedError, ErrorExt, StackError}; @@ -44,10 +46,13 @@ impl MockDatanodeHandler for () { } } +type RegionRequestHandler = + Arc Result + Send + Sync>; + #[derive(Clone)] pub struct DatanodeWatcher { sender: mpsc::Sender<(Peer, RegionRequest)>, - handler: Option Result>, + handler: Option, } impl DatanodeWatcher { @@ -60,9 +65,9 @@ impl DatanodeWatcher { pub fn with_handler( mut self, - user_handler: fn(Peer, RegionRequest) -> Result, + user_handler: impl Fn(Peer, RegionRequest) -> Result + Send + Sync + 'static, ) -> Self { - self.handler = Some(user_handler); + self.handler = Some(Arc::new(user_handler)); self } } @@ -75,7 +80,7 @@ impl MockDatanodeHandler for DatanodeWatcher { .send((peer.clone(), request.clone())) .await .unwrap(); - if let Some(handler) = self.handler { + if let Some(handler) = self.handler.as_ref() { handler(peer.clone(), request) } else { Ok(RegionResponse::new(0)) diff --git a/src/common/meta/src/ddl/tests/alter_logical_tables.rs b/src/common/meta/src/ddl/tests/alter_logical_tables.rs index 01ab8e513c..20733bec03 100644 --- a/src/common/meta/src/ddl/tests/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/alter_logical_tables.rs @@ -23,17 +23,20 @@ use api::v1::{ColumnDataType, SemanticType}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_procedure::{Procedure, ProcedureId, Status}; use common_procedure_test::MockContextProvider; -use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY; +use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::{ALTER_PHYSICAL_EXTENSION_KEY, MANIFEST_INFO_EXTENSION_KEY}; use store_api::region_engine::RegionManifestInfo; +use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; use tokio::sync::mpsc; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder; use crate::ddl::test_util::columns::TestColumnDefBuilder; -use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, NaiveDatanodeHandler}; +use crate::ddl::test_util::datanode_handler::DatanodeWatcher; use crate::ddl::test_util::{ - create_logical_table, create_physical_table, create_physical_table_metadata, + assert_column_name, create_logical_table, create_physical_table, + create_physical_table_metadata, get_raw_table_info, test_column_metadatas, test_create_physical_table_task, }; use crate::error::Error::{AlterLogicalTablesInvalidArguments, TableNotFound}; @@ -96,6 +99,52 @@ fn make_alter_logical_table_rename_task( } } +fn make_alters_request_handler( + column_metadatas: Vec, +) -> impl Fn(Peer, RegionRequest) -> Result { + move |_peer: Peer, request: RegionRequest| { + if let region_request::Body::Alters(_) = request.body.unwrap() { + let mut response = RegionResponse::new(0); + // Default region id for physical table. + let region_id = RegionId::new(1000, 1); + response.extensions.insert( + MANIFEST_INFO_EXTENSION_KEY.to_string(), + RegionManifestInfo::encode_list(&[( + region_id, + RegionManifestInfo::metric(1, 0, 2, 0), + )]) + .unwrap(), + ); + response.extensions.insert( + ALTER_PHYSICAL_EXTENSION_KEY.to_string(), + ColumnMetadata::encode_list(&column_metadatas).unwrap(), + ); + return Ok(response); + } + Ok(RegionResponse::new(0)) + } +} + +fn assert_alters_request( + peer: Peer, + request: RegionRequest, + expected_peer_id: u64, + expected_region_ids: &[RegionId], +) { + assert_eq!(peer.id, expected_peer_id,); + let Some(region_request::Body::Alters(req)) = request.body else { + unreachable!(); + }; + for (i, region_id) in expected_region_ids.iter().enumerate() { + assert_eq!( + req.requests[i].region_id, + *region_id, + "actual region id: {}", + RegionId::from_u64(req.requests[i].region_id) + ); + } +} + #[tokio::test] async fn test_on_prepare_check_schema() { let node_manager = Arc::new(MockDatanodeManager::new(())); @@ -205,15 +254,20 @@ async fn test_on_prepare() { #[tokio::test] async fn test_on_update_metadata() { - let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + common_telemetry::init_default_ut_logging(); + let (tx, mut rx) = mpsc::channel(8); + let test_column_metadatas = test_column_metadatas(&["new_col", "mew_col"]); + let datanode_handler = + DatanodeWatcher::new(tx).with_handler(make_alters_request_handler(test_column_metadatas)); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); let ddl_context = new_ddl_context(node_manager); // Creates physical table let phy_id = create_physical_table(&ddl_context, "phy").await; // Creates 3 logical tables - create_logical_table(ddl_context.clone(), phy_id, "table1").await; - create_logical_table(ddl_context.clone(), phy_id, "table2").await; - create_logical_table(ddl_context.clone(), phy_id, "table3").await; + let logical_table1_id = create_logical_table(ddl_context.clone(), phy_id, "table1").await; + let logical_table2_id = create_logical_table(ddl_context.clone(), phy_id, "table2").await; + let logical_table3_id = create_logical_table(ddl_context.clone(), phy_id, "table3").await; create_logical_table(ddl_context.clone(), phy_id, "table4").await; create_logical_table(ddl_context.clone(), phy_id, "table5").await; @@ -223,7 +277,7 @@ async fn test_on_update_metadata() { make_alter_logical_table_add_column_task(None, "table3", vec!["new_col".to_string()]), ]; - let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context); + let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context.clone()); let mut status = procedure.on_prepare().await.unwrap(); assert_matches!( status, @@ -255,18 +309,52 @@ async fn test_on_update_metadata() { clean_poisons: false } ); + let (peer, request) = rx.try_recv().unwrap(); + rx.try_recv().unwrap_err(); + assert_alters_request( + peer, + request, + 0, + &[ + RegionId::new(logical_table1_id, 0), + RegionId::new(logical_table2_id, 0), + RegionId::new(logical_table3_id, 0), + ], + ); + + let table_info = get_raw_table_info(&ddl_context, phy_id).await; + assert_column_name( + &table_info, + &["ts", "value", "__table_id", "__tsid", "new_col", "mew_col"], + ); + assert_eq!( + table_info.meta.column_ids, + vec![ + 0, + 1, + ReservedColumnId::table_id(), + ReservedColumnId::tsid(), + 2, + 3 + ] + ); } #[tokio::test] async fn test_on_part_duplicate_alter_request() { - let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); - let ddl_context = new_ddl_context(node_manager); + common_telemetry::init_default_ut_logging(); + let (tx, mut rx) = mpsc::channel(8); + let column_metadatas = test_column_metadatas(&["col_0"]); + let handler = + DatanodeWatcher::new(tx).with_handler(make_alters_request_handler(column_metadatas)); + let node_manager = Arc::new(MockDatanodeManager::new(handler)); + let mut ddl_context = new_ddl_context(node_manager); // Creates physical table let phy_id = create_physical_table(&ddl_context, "phy").await; // Creates 3 logical tables - create_logical_table(ddl_context.clone(), phy_id, "table1").await; - create_logical_table(ddl_context.clone(), phy_id, "table2").await; + let logical_table1_id = create_logical_table(ddl_context.clone(), phy_id, "table1").await; + let logical_table2_id = create_logical_table(ddl_context.clone(), phy_id, "table2").await; let tasks = vec![ make_alter_logical_table_add_column_task(None, "table1", vec!["col_0".to_string()]), @@ -305,6 +393,40 @@ async fn test_on_part_duplicate_alter_request() { clean_poisons: false } ); + let (peer, request) = rx.try_recv().unwrap(); + rx.try_recv().unwrap_err(); + assert_alters_request( + peer, + request, + 0, + &[ + RegionId::new(logical_table1_id, 0), + RegionId::new(logical_table2_id, 0), + ], + ); + + let table_info = get_raw_table_info(&ddl_context, phy_id).await; + assert_column_name( + &table_info, + &["ts", "value", "__table_id", "__tsid", "col_0"], + ); + assert_eq!( + table_info.meta.column_ids, + vec![ + 0, + 1, + ReservedColumnId::table_id(), + ReservedColumnId::tsid(), + 2 + ] + ); + + let (tx, mut rx) = mpsc::channel(8); + let column_metadatas = test_column_metadatas(&["col_0", "new_col_1", "new_col_2"]); + let handler = + DatanodeWatcher::new(tx).with_handler(make_alters_request_handler(column_metadatas)); + let node_manager = Arc::new(MockDatanodeManager::new(handler)); + ddl_context.node_manager = node_manager; // re-alter let tasks = vec![ @@ -357,6 +479,44 @@ async fn test_on_part_duplicate_alter_request() { } ); + let (peer, request) = rx.try_recv().unwrap(); + rx.try_recv().unwrap_err(); + assert_alters_request( + peer, + request, + 0, + &[ + RegionId::new(logical_table1_id, 0), + RegionId::new(logical_table2_id, 0), + ], + ); + + let table_info = get_raw_table_info(&ddl_context, phy_id).await; + assert_column_name( + &table_info, + &[ + "ts", + "value", + "__table_id", + "__tsid", + "col_0", + "new_col_1", + "new_col_2", + ], + ); + assert_eq!( + table_info.meta.column_ids, + vec![ + 0, + 1, + ReservedColumnId::table_id(), + ReservedColumnId::tsid(), + 2, + 3, + 4, + ] + ); + let table_name_keys = vec![ TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "table1"), TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "table2"), @@ -422,27 +582,13 @@ async fn test_on_part_duplicate_alter_request() { ); } -fn alters_request_handler(_peer: Peer, request: RegionRequest) -> Result { - if let region_request::Body::Alters(_) = request.body.unwrap() { - let mut response = RegionResponse::new(0); - // Default region id for physical table. - let region_id = RegionId::new(1000, 1); - response.extensions.insert( - MANIFEST_INFO_EXTENSION_KEY.to_string(), - RegionManifestInfo::encode_list(&[(region_id, RegionManifestInfo::metric(1, 0, 2, 0))]) - .unwrap(), - ); - return Ok(response); - } - - Ok(RegionResponse::new(0)) -} - #[tokio::test] async fn test_on_submit_alter_region_request() { common_telemetry::init_default_ut_logging(); let (tx, mut rx) = mpsc::channel(8); - let handler = DatanodeWatcher::new(tx).with_handler(alters_request_handler); + let column_metadatas = test_column_metadatas(&["new_col", "mew_col"]); + let handler = + DatanodeWatcher::new(tx).with_handler(make_alters_request_handler(column_metadatas)); let node_manager = Arc::new(MockDatanodeManager::new(handler)); let ddl_context = new_ddl_context(node_manager); diff --git a/src/common/meta/src/ddl/tests/alter_table.rs b/src/common/meta/src/ddl/tests/alter_table.rs index 26e07117c8..08e39ced45 100644 --- a/src/common/meta/src/ddl/tests/alter_table.rs +++ b/src/common/meta/src/ddl/tests/alter_table.rs @@ -30,7 +30,12 @@ use common_error::status_code::StatusCode; use common_procedure::store::poison_store::PoisonStore; use common_procedure::{ProcedureId, Status}; use common_procedure_test::MockContextProvider; -use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::{ + MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY, +}; use store_api::region_engine::RegionManifestInfo; use store_api::storage::RegionId; use table::requests::TTL_KEY; @@ -43,6 +48,7 @@ use crate::ddl::test_util::datanode_handler::{ AllFailureDatanodeHandler, DatanodeWatcher, PartialSuccessDatanodeHandler, RequestOutdatedErrorDatanodeHandler, }; +use crate::ddl::test_util::{assert_column_name, assert_column_name_and_id}; use crate::error::{Error, Result}; use crate::key::datanode_table::DatanodeTableKey; use crate::key::table_name::TableNameKey; @@ -179,6 +185,30 @@ fn alter_request_handler(_peer: Peer, request: RegionRequest) -> Result Result, +) -> impl Fn(Peer, RegionRequest) -> Result { + move |_peer, request| { + let _ = _peer; + if let region_request::Body::Creates(_) = request.body.unwrap() { + let mut response = RegionResponse::new(0); + // Default region id for physical table. + let region_id = RegionId::new(1024, 1); + response.extensions.insert( + MANIFEST_INFO_EXTENSION_KEY.to_string(), + RegionManifestInfo::encode_list(&[( + region_id, + RegionManifestInfo::metric(1, 0, 2, 0), + )]) + .unwrap(), + ); + response.extensions.insert( + ALTER_PHYSICAL_EXTENSION_KEY.to_string(), + ColumnMetadata::encode_list(&column_metadatas).unwrap(), + ); + return Ok(response); + } + + Ok(RegionResponse::new(0)) + } +} + +fn assert_creates_request( + peer: Peer, + request: RegionRequest, + expected_peer_id: u64, + expected_region_ids: &[RegionId], +) { + assert_eq!(peer.id, expected_peer_id,); + let Some(region_request::Body::Creates(req)) = request.body else { + unreachable!(); + }; + for (i, region_id) in expected_region_ids.iter().enumerate() { + assert_eq!( + req.requests[i].region_id, + *region_id, + "actual region id: {}", + RegionId::from_u64(req.requests[i].region_id) + ); + } +} + #[tokio::test] async fn test_on_prepare_physical_table_not_found() { let node_manager = Arc::new(MockDatanodeManager::new(())); @@ -227,7 +278,12 @@ async fn test_on_prepare_part_logical_tables_exist() { #[tokio::test] async fn test_on_create_metadata() { - let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + common_telemetry::init_default_ut_logging(); + let (tx, mut rx) = mpsc::channel(8); + let column_metadatas = test_column_metadatas(&["host", "cpu"]); + let datanode_handler = + DatanodeWatcher::new(tx).with_handler(make_creates_request_handler(column_metadatas)); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); let ddl_context = new_ddl_context(node_manager); // Prepares physical table metadata. let mut create_physical_table_task = test_create_physical_table_task("phy_table"); @@ -255,7 +311,7 @@ async fn test_on_create_metadata() { let mut procedure = CreateLogicalTablesProcedure::new( vec![task, yet_another_task], physical_table_id, - ddl_context, + ddl_context.clone(), ); let status = procedure.on_prepare().await.unwrap(); assert_matches!( @@ -274,11 +330,42 @@ async fn test_on_create_metadata() { let status = procedure.execute(&ctx).await.unwrap(); let table_ids = status.downcast_output_ref::>().unwrap(); assert_eq!(*table_ids, vec![1025, 1026]); + + let (peer, request) = rx.try_recv().unwrap(); + rx.try_recv().unwrap_err(); + assert_creates_request( + peer, + request, + 0, + &[RegionId::new(1025, 0), RegionId::new(1026, 0)], + ); + + let table_info = get_raw_table_info(&ddl_context, table_id).await; + assert_column_name( + &table_info, + &["ts", "value", "__table_id", "__tsid", "host", "cpu"], + ); + assert_eq!( + table_info.meta.column_ids, + vec![ + 0, + 1, + ReservedColumnId::table_id(), + ReservedColumnId::tsid(), + 2, + 3 + ] + ); } #[tokio::test] async fn test_on_create_metadata_part_logical_tables_exist() { - let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + common_telemetry::init_default_ut_logging(); + let (tx, mut rx) = mpsc::channel(8); + let column_metadatas = test_column_metadatas(&["host", "cpu"]); + let datanode_handler = + DatanodeWatcher::new(tx).with_handler(make_creates_request_handler(column_metadatas)); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); let ddl_context = new_ddl_context(node_manager); // Prepares physical table metadata. let mut create_physical_table_task = test_create_physical_table_task("phy_table"); @@ -317,7 +404,7 @@ async fn test_on_create_metadata_part_logical_tables_exist() { let mut procedure = CreateLogicalTablesProcedure::new( vec![task, non_exist_task], physical_table_id, - ddl_context, + ddl_context.clone(), ); let status = procedure.on_prepare().await.unwrap(); assert_matches!( @@ -336,6 +423,27 @@ async fn test_on_create_metadata_part_logical_tables_exist() { let status = procedure.execute(&ctx).await.unwrap(); let table_ids = status.downcast_output_ref::>().unwrap(); assert_eq!(*table_ids, vec![8192, 1025]); + + let (peer, request) = rx.try_recv().unwrap(); + rx.try_recv().unwrap_err(); + assert_creates_request(peer, request, 0, &[RegionId::new(1025, 0)]); + + let table_info = get_raw_table_info(&ddl_context, table_id).await; + assert_column_name( + &table_info, + &["ts", "value", "__table_id", "__tsid", "host", "cpu"], + ); + assert_eq!( + table_info.meta.column_ids, + vec![ + 0, + 1, + ReservedColumnId::table_id(), + ReservedColumnId::tsid(), + 2, + 3 + ] + ); } #[tokio::test] @@ -399,27 +507,13 @@ async fn test_on_create_metadata_err() { assert!(!error.is_retry_later()); } -fn creates_request_handler(_peer: Peer, request: RegionRequest) -> Result { - if let region_request::Body::Creates(_) = request.body.unwrap() { - let mut response = RegionResponse::new(0); - // Default region id for physical table. - let region_id = RegionId::new(1024, 1); - response.extensions.insert( - MANIFEST_INFO_EXTENSION_KEY.to_string(), - RegionManifestInfo::encode_list(&[(region_id, RegionManifestInfo::metric(1, 0, 2, 0))]) - .unwrap(), - ); - return Ok(response); - } - - Ok(RegionResponse::new(0)) -} - #[tokio::test] async fn test_on_submit_create_request() { common_telemetry::init_default_ut_logging(); let (tx, mut rx) = mpsc::channel(8); - let handler = DatanodeWatcher::new(tx).with_handler(creates_request_handler); + let column_metadatas = test_column_metadatas(&["host", "cpu"]); + let handler = + DatanodeWatcher::new(tx).with_handler(make_creates_request_handler(column_metadatas)); let node_manager = Arc::new(MockDatanodeManager::new(handler)); let ddl_context = new_ddl_context(node_manager); let mut create_physical_table_task = test_create_physical_table_task("phy_table"); diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index c4cae2233d..8e8d70957d 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -16,7 +16,9 @@ use std::assert_matches::assert_matches; use std::collections::HashMap; use std::sync::Arc; -use api::v1::meta::Partition; +use api::region::RegionResponse; +use api::v1::meta::{Partition, Peer}; +use api::v1::region::{region_request, RegionRequest}; use api::v1::{ColumnDataType, SemanticType}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -24,7 +26,12 @@ use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Stat use common_procedure_test::{ execute_procedure_until, execute_procedure_until_done, MockContextProvider, }; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY; use store_api::storage::RegionId; +use tokio::sync::mpsc; use crate::ddl::create_table::{CreateTableProcedure, CreateTableState}; use crate::ddl::test_util::columns::TestColumnDefBuilder; @@ -32,14 +39,73 @@ use crate::ddl::test_util::create_table::{ build_raw_table_info_from_expr, TestCreateTableExprBuilder, }; use crate::ddl::test_util::datanode_handler::{ - NaiveDatanodeHandler, RetryErrorDatanodeHandler, UnexpectedErrorDatanodeHandler, + DatanodeWatcher, NaiveDatanodeHandler, RetryErrorDatanodeHandler, + UnexpectedErrorDatanodeHandler, }; -use crate::error::Error; +use crate::ddl::test_util::{assert_column_name, get_raw_table_info}; +use crate::error::{Error, Result}; use crate::key::table_route::TableRouteValue; use crate::kv_backend::memory::MemoryKvBackend; use crate::rpc::ddl::CreateTableTask; use crate::test_util::{new_ddl_context, new_ddl_context_with_kv_backend, MockDatanodeManager}; +fn create_request_handler(_peer: Peer, request: RegionRequest) -> Result { + let _ = _peer; + if let region_request::Body::Create(_) = request.body.unwrap() { + let mut response = RegionResponse::new(0); + + response.extensions.insert( + TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(), + ColumnMetadata::encode_list(&[ + ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 0, + }, + ColumnMetadata { + column_schema: ColumnSchema::new( + "host", + ConcreteDataType::float64_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }, + ColumnMetadata { + column_schema: ColumnSchema::new( + "cpu", + ConcreteDataType::float64_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 2, + }, + ]) + .unwrap(), + ); + return Ok(response); + } + + Ok(RegionResponse::new(0)) +} + +fn assert_create_request( + peer: Peer, + request: RegionRequest, + expected_peer_id: u64, + expected_region_id: RegionId, +) { + assert_eq!(peer.id, expected_peer_id); + let Some(region_request::Body::Create(req)) = request.body else { + unreachable!(); + }; + assert_eq!(req.region_id, expected_region_id); +} + pub(crate) fn test_create_table_task(name: &str) -> CreateTableTask { let create_table = TestCreateTableExprBuilder::default() .column_defs([ @@ -230,11 +296,13 @@ async fn test_on_create_metadata_error() { #[tokio::test] async fn test_on_create_metadata() { common_telemetry::init_default_ut_logging(); - let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let (tx, mut rx) = mpsc::channel(8); + let datanode_handler = DatanodeWatcher::new(tx).with_handler(create_request_handler); + let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler)); let ddl_context = new_ddl_context(node_manager); let task = test_create_table_task("foo"); assert!(!task.create_table.create_if_not_exists); - let mut procedure = CreateTableProcedure::new(task, ddl_context); + let mut procedure = CreateTableProcedure::new(task, ddl_context.clone()); procedure.on_prepare().await.unwrap(); let ctx = ProcedureContext { procedure_id: ProcedureId::random(), @@ -243,8 +311,16 @@ async fn test_on_create_metadata() { procedure.execute(&ctx).await.unwrap(); // Triggers procedure to create table metadata let status = procedure.execute(&ctx).await.unwrap(); - let table_id = status.downcast_output_ref::().unwrap(); - assert_eq!(*table_id, 1024); + let table_id = *status.downcast_output_ref::().unwrap(); + assert_eq!(table_id, 1024); + + let (peer, request) = rx.try_recv().unwrap(); + rx.try_recv().unwrap_err(); + assert_create_request(peer, request, 0, RegionId::new(table_id, 0)); + + let table_info = get_raw_table_info(&ddl_context, table_id).await; + assert_column_name(&table_info, &["ts", "host", "cpu"]); + assert_eq!(table_info.meta.column_ids, vec![0, 1, 2]); } #[tokio::test] diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index eb1299334f..521d1dd817 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -29,6 +29,7 @@ use common_telemetry::{error, info, warn}; use common_wal::options::WalOptions; use futures::future::join_all; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, MANIFEST_INFO_EXTENSION_KEY}; use store_api::region_engine::RegionManifestInfo; use store_api::storage::{RegionId, RegionNumber}; @@ -37,8 +38,8 @@ use table::table_reference::TableReference; use crate::ddl::{DdlContext, DetectingRegion}; use crate::error::{ - self, Error, OperateDatanodeSnafu, ParseWalOptionsSnafu, Result, TableNotFoundSnafu, - UnsupportedSnafu, + self, DecodeJsonSnafu, Error, MetadataCorruptionSnafu, OperateDatanodeSnafu, + ParseWalOptionsSnafu, Result, TableNotFoundSnafu, UnsupportedSnafu, }; use crate::key::datanode_table::DatanodeTableValue; use crate::key::table_name::TableNameKey; @@ -314,11 +315,23 @@ pub fn parse_manifest_infos_from_extensions( Ok(data_manifest_version) } +/// Parses column metadatas from extensions. +pub fn parse_column_metadatas( + extensions: &HashMap>, + key: &str, +) -> Result> { + let value = extensions.get(key).context(error::UnexpectedSnafu { + err_msg: format!("column metadata extension not found: {}", key), + })?; + let column_metadatas = ColumnMetadata::decode_list(value).context(error::SerdeJsonSnafu {})?; + Ok(column_metadatas) +} + /// Sync follower regions on datanodes. pub async fn sync_follower_regions( context: &DdlContext, table_id: TableId, - results: Vec, + results: &[RegionResponse], region_routes: &[RegionRoute], engine: &str, ) -> Result<()> { @@ -331,7 +344,7 @@ pub async fn sync_follower_regions( } let results = results - .into_iter() + .iter() .map(|response| parse_manifest_infos_from_extensions(&response.extensions)) .collect::>>()? .into_iter() @@ -418,6 +431,38 @@ pub async fn sync_follower_regions( Ok(()) } +/// Extracts column metadatas from extensions. +pub fn extract_column_metadatas( + results: &mut [RegionResponse], + key: &str, +) -> Result>> { + let schemas = results + .iter_mut() + .map(|r| r.extensions.remove(key)) + .collect::>(); + + if schemas.is_empty() { + return Ok(None); + } + + // Verify all the physical schemas are the same + // Safety: previous check ensures this vec is not empty + let first = schemas.first().unwrap(); + ensure!( + schemas.iter().all(|x| x == first), + MetadataCorruptionSnafu { + err_msg: "The table column metadata schemas from datanodes are not the same." + } + ); + + if let Some(first) = first { + let column_metadatas = ColumnMetadata::decode_list(first).context(DecodeJsonSnafu)?; + Ok(Some(column_metadatas)) + } else { + Ok(None) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 3249bf686b..2de5abd764 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -334,6 +334,7 @@ mod tests { options: Default::default(), region_numbers: vec![1], partition_key_indices: vec![], + column_ids: vec![], }; RawTableInfo { diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 99eb24cebb..6a46d5e74b 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -1358,6 +1358,7 @@ mod tests { options: Default::default(), created_on: Default::default(), partition_key_indices: Default::default(), + column_ids: Default::default(), }; // construct RawTableInfo diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index f2420522ae..33898720bc 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -195,6 +195,7 @@ pub mod test_data { options: TableOptions::default(), created_on: DateTime::default(), partition_key_indices: vec![], + column_ids: vec![], }, table_type: TableType::Base, } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index d0bcc2797c..904fd45a47 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -1644,6 +1644,7 @@ fn create_table_info( options: table_options, created_on: Utc::now(), partition_key_indices, + column_ids: vec![], }; let desc = if create_table.desc.is_empty() { diff --git a/src/store-api/src/metric_engine_consts.rs b/src/store-api/src/metric_engine_consts.rs index bf0f405812..9c4d4974a7 100644 --- a/src/store-api/src/metric_engine_consts.rs +++ b/src/store-api/src/metric_engine_consts.rs @@ -73,6 +73,10 @@ pub const LOGICAL_TABLE_METADATA_KEY: &str = "on_physical_table"; /// Represent a list of column metadata that are added to physical table. pub const ALTER_PHYSICAL_EXTENSION_KEY: &str = "ALTER_PHYSICAL"; +/// HashMap key to be used in the region server's extension response. +/// Represent the column metadata of a table. +pub const TABLE_COLUMN_METADATA_EXTENSION_KEY: &str = "TABLE_COLUMN_METADATA"; + /// HashMap key to be used in the region server's extension response. /// Represent the manifest info of a region. pub const MANIFEST_INFO_EXTENSION_KEY: &str = "MANIFEST_INFO"; diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index fc27c2ff10..5e275fea85 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -460,7 +460,7 @@ pub struct RegionStatistic { } /// The manifest info of a region. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum RegionManifestInfo { Mito { manifest_version: u64, diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index f68d85164d..48e041a5df 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -134,6 +134,8 @@ pub struct TableMeta { pub created_on: DateTime, #[builder(default = "Vec::new()")] pub partition_key_indices: Vec, + #[builder(default = "Vec::new()")] + pub column_ids: Vec, } impl TableMetaBuilder { @@ -150,6 +152,7 @@ impl TableMetaBuilder { options: None, created_on: None, partition_key_indices: None, + column_ids: None, } } } @@ -178,6 +181,7 @@ impl TableMetaBuilder { options: None, created_on: None, partition_key_indices: None, + column_ids: None, } } } @@ -1114,7 +1118,7 @@ pub struct RawTableMeta { /// Engine type of this table. Usually in small case. pub engine: String, /// Next column id of a new column. - /// Deprecated. See https://github.com/GreptimeTeam/greptimedb/issues/2982 + /// It's used to ensure all columns with the same name across all regions have the same column id. pub next_column_id: ColumnId, pub region_numbers: Vec, pub options: TableOptions, @@ -1122,6 +1126,10 @@ pub struct RawTableMeta { /// Order doesn't matter to this array. #[serde(default)] pub partition_key_indices: Vec, + /// Map of column name to column id. + /// Note: This field may be empty for older versions that did not include this field. + #[serde(default)] + pub column_ids: Vec, } impl From for RawTableMeta { @@ -1136,6 +1144,7 @@ impl From for RawTableMeta { options: meta.options, created_on: meta.created_on, partition_key_indices: meta.partition_key_indices, + column_ids: meta.column_ids, } } } @@ -1154,6 +1163,7 @@ impl TryFrom for TableMeta { options: raw.options, created_on: raw.created_on, partition_key_indices: raw.partition_key_indices, + column_ids: raw.column_ids, }) } } @@ -1171,6 +1181,24 @@ pub struct RawTableInfo { } impl RawTableInfo { + /// Returns the map of column name to column id. + /// + /// Note: This method may return an empty map for older versions that did not include this field. + pub fn name_to_ids(&self) -> Option> { + if self.meta.column_ids.len() != self.meta.schema.column_schemas.len() { + None + } else { + Some( + self.meta + .column_ids + .iter() + .enumerate() + .map(|(index, id)| (self.meta.schema.column_schemas[index].name.clone(), *id)) + .collect(), + ) + } + } + /// Sort the columns in [RawTableInfo], logical tables require it. pub fn sort_columns(&mut self) { let column_schemas = &self.meta.schema.column_schemas; @@ -1181,6 +1209,7 @@ impl RawTableInfo { .map(|index| column_schemas[*index].name.clone()) .collect::>(); + let name_to_ids = self.name_to_ids().unwrap_or_default(); self.meta .schema .column_schemas @@ -1191,6 +1220,7 @@ impl RawTableInfo { let mut timestamp_index = None; let mut value_indices = Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len() - 1); + let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len()); for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() { if primary_keys.contains(&column_schema.name) { primary_key_indices.push(index); @@ -1199,12 +1229,16 @@ impl RawTableInfo { } else { value_indices.push(index); } + if let Some(id) = name_to_ids.get(&column_schema.name) { + column_ids.push(*id); + } } // Overwrite table meta self.meta.schema.timestamp_index = timestamp_index; self.meta.primary_key_indices = primary_key_indices; self.meta.value_indices = value_indices; + self.meta.column_ids = column_ids; } /// Extracts region options from table info. diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index ba0162d506..bba3479843 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -81,6 +81,7 @@ impl NumbersTable { options: Default::default(), created_on: Default::default(), partition_key_indices: vec![], + column_ids: vec![], }; let table_info = TableInfoBuilder::default()