diff --git a/src/common/meta/src/ddl/test_util.rs b/src/common/meta/src/ddl/test_util.rs index 24fa5522fa..505abf870b 100644 --- a/src/common/meta/src/ddl/test_util.rs +++ b/src/common/meta/src/ddl/test_util.rs @@ -17,6 +17,7 @@ pub mod columns; pub mod create_table; pub mod datanode_handler; pub mod flownode_handler; +pub mod region_metadata; use std::assert_matches::assert_matches; use std::collections::HashMap; diff --git a/src/common/meta/src/ddl/test_util/datanode_handler.rs b/src/common/meta/src/ddl/test_util/datanode_handler.rs index 4688cfa987..eb56b7fffc 100644 --- a/src/common/meta/src/ddl/test_util/datanode_handler.rs +++ b/src/common/meta/src/ddl/test_util/datanode_handler.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use api::region::RegionResponse; +use api::v1::region::region_request::Body; use api::v1::region::RegionRequest; use common_error::ext::{BoxedError, ErrorExt, StackError}; use common_error::status_code::StatusCode; @@ -22,6 +24,8 @@ use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use snafu::{ResultExt, Snafu}; +use store_api::metadata::RegionMetadata; +use store_api::storage::RegionId; use tokio::sync::mpsc; use crate::error::{self, Error, Result}; @@ -277,3 +281,47 @@ impl MockDatanodeHandler for AllFailureDatanodeHandler { unreachable!() } } + +#[derive(Clone)] +pub struct ListMetadataDatanodeHandler { + pub region_metadatas: HashMap>, +} + +impl ListMetadataDatanodeHandler { + pub fn new(region_metadatas: HashMap>) -> Self { + Self { region_metadatas } + } +} + +#[async_trait::async_trait] +impl MockDatanodeHandler for ListMetadataDatanodeHandler { + async fn handle(&self, _peer: &Peer, request: RegionRequest) -> Result { + let Some(Body::ListMetadata(req)) = request.body else { + unreachable!() + }; + let mut response = RegionResponse::new(0); + + let mut output = Vec::with_capacity(req.region_ids.len()); + for region_id in req.region_ids { + match self.region_metadatas.get(&RegionId::from_u64(region_id)) { + Some(metadata) => { + output.push(metadata.clone()); + } + None => { + output.push(None); + } + } + } + + response.metadata = serde_json::to_vec(&output).unwrap(); + Ok(response) + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} diff --git a/src/common/meta/src/ddl/test_util/region_metadata.rs b/src/common/meta/src/ddl/test_util/region_metadata.rs new file mode 100644 index 0000000000..1d84f3549c --- /dev/null +++ b/src/common/meta/src/ddl/test_util/region_metadata.rs @@ -0,0 +1,34 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::SemanticType; +use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; +use store_api::storage::RegionId; + +/// Builds a region metadata with the given column metadatas. +pub fn build_region_metadata( + region_id: RegionId, + column_metadatas: &[ColumnMetadata], +) -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(region_id); + let mut primary_key = vec![]; + for column_metadata in column_metadatas { + builder.push_column_metadata(column_metadata.clone()); + if column_metadata.semantic_type == SemanticType::Tag { + primary_key.push(column_metadata.column_id); + } + } + builder.primary_key(primary_key); + builder.build().unwrap() +} diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index a5cbfd9b77..ea6d8512c9 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -13,6 +13,8 @@ // limitations under the License. pub(crate) mod raw_table_info; +#[allow(dead_code)] +pub(crate) mod region_metadata_lister; pub(crate) mod table_id; pub(crate) mod table_info; diff --git a/src/common/meta/src/ddl/utils/raw_table_info.rs b/src/common/meta/src/ddl/utils/raw_table_info.rs index 37c7446e2f..812f9185c2 100644 --- a/src/common/meta/src/ddl/utils/raw_table_info.rs +++ b/src/common/meta/src/ddl/utils/raw_table_info.rs @@ -54,7 +54,10 @@ pub(crate) fn build_new_physical_table_info( } } SemanticType::Field => value_indices.push(idx), - SemanticType::Timestamp => *time_index = Some(idx), + SemanticType::Timestamp => { + value_indices.push(idx); + *time_index = Some(idx); + } } columns.push(col.column_schema.clone()); diff --git a/src/common/meta/src/ddl/utils/region_metadata_lister.rs b/src/common/meta/src/ddl/utils/region_metadata_lister.rs new file mode 100644 index 0000000000..30bacd04e7 --- /dev/null +++ b/src/common/meta/src/ddl/utils/region_metadata_lister.rs @@ -0,0 +1,240 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::region::region_request::Body as PbRegionRequest; +use api::v1::region::{ListMetadataRequest, RegionRequest, RegionRequestHeader}; +use common_telemetry::tracing_context::TracingContext; +use futures::future::join_all; +use snafu::ResultExt; +use store_api::metadata::RegionMetadata; +use store_api::storage::{RegionId, TableId}; + +use crate::ddl::utils::add_peer_context_if_needed; +use crate::error::{DecodeJsonSnafu, Result}; +use crate::node_manager::NodeManagerRef; +use crate::rpc::router::{find_leaders, region_distribution, RegionRoute}; + +/// Collects the region metadata from the datanodes. +pub struct RegionMetadataLister { + node_manager: NodeManagerRef, +} + +impl RegionMetadataLister { + /// Creates a new [`RegionMetadataLister`] with the given [`NodeManagerRef`]. + pub fn new(node_manager: NodeManagerRef) -> Self { + Self { node_manager } + } + + /// Collects the region metadata from the datanodes. + pub async fn list( + &self, + table_id: TableId, + region_routes: &[RegionRoute], + ) -> Result>> { + let region_distribution = region_distribution(region_routes); + let leaders = find_leaders(region_routes) + .into_iter() + .map(|p| (p.id, p)) + .collect::>(); + + let total_num_region = region_distribution + .values() + .map(|r| r.leader_regions.len()) + .sum::(); + + let mut list_metadata_tasks = Vec::with_capacity(leaders.len()); + + // Build requests. + for (datanode_id, region_role_set) in region_distribution { + if region_role_set.leader_regions.is_empty() { + continue; + } + // Safety: must exists. + let peer = leaders.get(&datanode_id).unwrap(); + let requester = self.node_manager.datanode(peer).await; + let region_ids = region_role_set + .leader_regions + .iter() + .map(|r| RegionId::new(table_id, *r).as_u64()) + .collect(); + let request = Self::build_list_metadata_request(region_ids); + + let peer = peer.clone(); + list_metadata_tasks.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(peer)) + }); + } + + let results = join_all(list_metadata_tasks) + .await + .into_iter() + .collect::>>()? + .into_iter() + .map(|r| r.metadata); + + let mut output = Vec::with_capacity(total_num_region); + for result in results { + let region_metadatas: Vec> = + serde_json::from_slice(&result).context(DecodeJsonSnafu)?; + output.extend(region_metadatas); + } + + Ok(output) + } + + fn build_list_metadata_request(region_ids: Vec) -> RegionRequest { + RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(PbRegionRequest::ListMetadata(ListMetadataRequest { + region_ids, + })), + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use api::region::RegionResponse; + use api::v1::meta::Peer; + use api::v1::region::region_request::Body; + use api::v1::region::RegionRequest; + use store_api::metadata::RegionMetadata; + use store_api::storage::RegionId; + use tokio::sync::mpsc; + + use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, ListMetadataDatanodeHandler}; + use crate::ddl::test_util::region_metadata::build_region_metadata; + use crate::ddl::test_util::test_column_metadatas; + use crate::ddl::utils::region_metadata_lister::RegionMetadataLister; + use crate::error::Result; + use crate::rpc::router::{Region, RegionRoute}; + use crate::test_util::MockDatanodeManager; + + fn assert_list_metadata_request(req: RegionRequest, expected_region_ids: &[RegionId]) { + let Some(Body::ListMetadata(req)) = req.body else { + unreachable!() + }; + + assert_eq!(req.region_ids.len(), expected_region_ids.len()); + for region_id in expected_region_ids { + assert!(req.region_ids.contains(®ion_id.as_u64())); + } + } + + fn empty_list_metadata_handler(_peer: Peer, request: RegionRequest) -> Result { + let Some(Body::ListMetadata(req)) = request.body else { + unreachable!() + }; + + let mut output: Vec> = Vec::with_capacity(req.region_ids.len()); + for _region_id in req.region_ids { + output.push(None); + } + + Ok(RegionResponse::from_metadata( + serde_json::to_vec(&output).unwrap(), + )) + } + + #[tokio::test] + async fn test_list_request() { + let (tx, mut rx) = mpsc::channel(8); + let handler = DatanodeWatcher::new(tx).with_handler(empty_list_metadata_handler); + let node_manager = Arc::new(MockDatanodeManager::new(handler)); + let lister = RegionMetadataLister::new(node_manager); + let region_routes = vec![ + RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(5)], + leader_state: None, + leader_down_since: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 2)), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![Peer::empty(4)], + leader_state: None, + leader_down_since: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 3)), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![Peer::empty(4)], + leader_state: None, + leader_down_since: None, + }, + ]; + let region_metadatas = lister.list(1024, ®ion_routes).await.unwrap(); + assert_eq!(region_metadatas.len(), 3); + + let mut requests = vec![]; + for _ in 0..2 { + let (peer, request) = rx.try_recv().unwrap(); + requests.push((peer, request)); + } + rx.try_recv().unwrap_err(); + + let (peer, request) = requests.remove(0); + assert_eq!(peer.id, 1); + assert_list_metadata_request(request, &[RegionId::new(1024, 1)]); + let (peer, request) = requests.remove(0); + assert_eq!(peer.id, 3); + assert_list_metadata_request(request, &[RegionId::new(1024, 2), RegionId::new(1024, 3)]); + } + + #[tokio::test] + async fn test_list_region_metadata() { + let region_metadata = + build_region_metadata(RegionId::new(1024, 1), &test_column_metadatas(&["tag_0"])); + let region_metadatas = HashMap::from([ + (RegionId::new(1024, 0), None), + (RegionId::new(1024, 1), Some(region_metadata.clone())), + ]); + let handler = ListMetadataDatanodeHandler::new(region_metadatas); + let node_manager = Arc::new(MockDatanodeManager::new(handler)); + let lister = RegionMetadataLister::new(node_manager); + let region_routes = vec![ + RegionRoute { + region: Region::new_test(RegionId::new(1024, 0)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + }, + ]; + let region_metadatas = lister.list(1024, ®ion_routes).await.unwrap(); + assert_eq!(region_metadatas.len(), 2); + assert_eq!(region_metadatas[0], None); + assert_eq!(region_metadatas[1], Some(region_metadata)); + } +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 8abd4982af..14c52453bd 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -877,6 +877,36 @@ pub enum Error { #[snafu(source)] error: object_store::Error, }, + + #[snafu(display( + "Missing column in column metadata: {}, table: {}, table_id: {}", + column_name, + table_name, + table_id, + ))] + MissingColumnInColumnMetadata { + column_name: String, + #[snafu(implicit)] + location: Location, + table_name: String, + table_id: TableId, + }, + + #[snafu(display( + "Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}", + column_name, + column_id, + table_name, + table_id, + ))] + MismatchColumnId { + column_name: String, + column_id: u32, + #[snafu(implicit)] + location: Location, + table_name: String, + table_id: TableId, + }, } pub type Result = std::result::Result; @@ -896,7 +926,10 @@ impl ErrorExt for Error { | DeserializeFromJson { .. } => StatusCode::Internal, NoLeader { .. } => StatusCode::TableUnavailable, - ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected, + ValueNotExist { .. } + | ProcedurePoisonConflict { .. } + | MissingColumnInColumnMetadata { .. } + | MismatchColumnId { .. } => StatusCode::Unexpected, Unsupported { .. } => StatusCode::Unsupported, WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable, diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 50d3cdc8d3..c13cac7e54 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -32,6 +32,7 @@ pub mod key; pub mod kv_backend; pub mod leadership_notifier; pub mod lock_key; +pub mod maintenance; pub mod metrics; pub mod node_expiry_listener; pub mod node_manager; diff --git a/src/common/meta/src/maintenance.rs b/src/common/meta/src/maintenance.rs new file mode 100644 index 0000000000..40d8258012 --- /dev/null +++ b/src/common/meta/src/maintenance.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod reconcile_table; diff --git a/src/common/meta/src/maintenance/reconcile_table.rs b/src/common/meta/src/maintenance/reconcile_table.rs new file mode 100644 index 0000000000..e3ab70b964 --- /dev/null +++ b/src/common/meta/src/maintenance/reconcile_table.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// TODO(weny): Remove it +#[allow(dead_code)] +pub(crate) mod utils; diff --git a/src/common/meta/src/maintenance/reconcile_table/utils.rs b/src/common/meta/src/maintenance/reconcile_table/utils.rs new file mode 100644 index 0000000000..ef4ac5edcb --- /dev/null +++ b/src/common/meta/src/maintenance/reconcile_table/utils.rs @@ -0,0 +1,694 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::fmt; + +use api::v1::SemanticType; +use datatypes::schema::ColumnSchema; +use snafu::{ensure, OptionExt}; +use store_api::metadata::{ColumnMetadata, RegionMetadata}; +use store_api::storage::{RegionId, TableId}; +use table::metadata::RawTableMeta; +use table::table_reference::TableReference; + +use crate::error::{ + MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu, +}; + +#[derive(Debug, PartialEq, Eq)] +struct PartialRegionMetadata<'a> { + column_metadatas: &'a [ColumnMetadata], + primary_key: &'a [u32], + table_id: TableId, +} + +impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> { + fn from(region_metadata: &'a RegionMetadata) -> Self { + Self { + column_metadatas: ®ion_metadata.column_metadatas, + primary_key: ®ion_metadata.primary_key, + table_id: region_metadata.region_id.table_id(), + } + } +} + +/// A display wrapper for [`ColumnMetadata`] that formats the column metadata in a more readable way. +struct ColumnMetadataDisplay<'a>(pub &'a ColumnMetadata); + +impl<'a> fmt::Debug for ColumnMetadataDisplay<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let col = self.0; + write!( + f, + "Column {{ name: {}, id: {}, semantic_type: {:?}, data_type: {:?} }}", + col.column_schema.name, col.column_id, col.semantic_type, col.column_schema.data_type, + ) + } +} + +/// Checks if the column metadatas are consistent. +/// +/// The column metadatas are consistent if: +/// - The column metadatas are the same. +/// - The primary key are the same. +/// - The table id of the region metadatas are the same. +/// +/// ## Panic +/// Panic if region_metadatas is empty. +pub(crate) fn check_column_metadatas_consistent( + region_metadatas: &[RegionMetadata], +) -> Option> { + let is_column_metadata_consistent = region_metadatas + .windows(2) + .all(|w| PartialRegionMetadata::from(&w[0]) == PartialRegionMetadata::from(&w[1])); + + if !is_column_metadata_consistent { + return None; + } + + Some(region_metadatas[0].column_metadatas.clone()) +} + +/// Resolves column metadata inconsistencies among the given region metadatas +/// by using the column metadata from the metasrv as the source of truth. +/// +/// All region metadatas whose column metadata differs from the given `column_metadatas` +/// will be marked for reconciliation. +/// +/// Returns the region ids that need to be reconciled. +pub(crate) fn resolve_column_metadatas_with_metasrv( + column_metadatas: &[ColumnMetadata], + region_metadatas: &[RegionMetadata], +) -> Result> { + let is_same_table = region_metadatas + .windows(2) + .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id()); + + ensure!( + is_same_table, + UnexpectedSnafu { + err_msg: "Region metadatas are not from the same table" + } + ); + + let mut regions_ids = vec![]; + for region_metadata in region_metadatas { + if region_metadata.column_metadatas != column_metadatas { + let is_invariant_preserved = check_column_metadata_invariants( + column_metadatas, + ®ion_metadata.column_metadatas, + ); + ensure!( + is_invariant_preserved, + UnexpectedSnafu { + err_msg: format!( + "Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}", + region_metadata.region_id, + column_metadatas.iter().map(ColumnMetadataDisplay).collect::>(), + region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::>(), + ) + } + ); + regions_ids.push(region_metadata.region_id); + } + } + Ok(regions_ids) +} + +/// Resolves column metadata inconsistencies among the given region metadatas +/// by selecting the column metadata with the highest schema version. +/// +/// This strategy assumes that at most two versions of column metadata may exist, +/// due to the poison mechanism, making the highest schema version a safe choice. +/// +/// Returns the resolved column metadata and the region ids that need to be reconciled. +pub(crate) fn resolve_column_metadatas_with_latest( + region_metadatas: &[RegionMetadata], +) -> Result<(Vec, Vec)> { + let is_same_table = region_metadatas + .windows(2) + .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id()); + + ensure!( + is_same_table, + UnexpectedSnafu { + err_msg: "Region metadatas are not from the same table" + } + ); + + let latest_region_metadata = region_metadatas + .iter() + .max_by_key(|c| c.schema_version) + .context(UnexpectedSnafu { + err_msg: "All Region metadatas have the same schema version", + })?; + let latest_column_metadatas = PartialRegionMetadata::from(latest_region_metadata); + + let mut region_ids = vec![]; + for region_metadata in region_metadatas { + if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas { + let is_invariant_preserved = check_column_metadata_invariants( + &latest_region_metadata.column_metadatas, + ®ion_metadata.column_metadatas, + ); + ensure!( + is_invariant_preserved, + UnexpectedSnafu { + err_msg: format!( + "Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}", + region_metadata.region_id, + latest_column_metadatas.column_metadatas.iter().map(ColumnMetadataDisplay).collect::>(), + region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::>() + ) + } + ); + region_ids.push(region_metadata.region_id); + } + } + + // TODO(weny): verify the new column metadatas are acceptable for regions. + Ok((latest_region_metadata.column_metadatas.clone(), region_ids)) +} + +/// Constructs a vector of [`ColumnMetadata`] from the provided table information. +/// +/// This function maps each [`ColumnSchema`] to its corresponding [`ColumnMetadata`] by +/// determining the semantic type (Tag, Timestamp, or Field) and retrieving the column ID +/// from the `name_to_ids` mapping. +/// +/// Returns an error if any column name is missing in the mapping. +pub(crate) fn build_column_metadata_from_table_info( + column_schemas: &[ColumnSchema], + primary_key_indexes: &[usize], + name_to_ids: &HashMap, +) -> Result> { + let primary_names = primary_key_indexes + .iter() + .map(|i| column_schemas[*i].name.as_str()) + .collect::>(); + + column_schemas + .iter() + .map(|column_schema| { + let column_id = *name_to_ids + .get(column_schema.name.as_str()) + .with_context(|| UnexpectedSnafu { + err_msg: format!( + "Column name {} not found in name_to_ids", + column_schema.name + ), + })?; + + let semantic_type = if primary_names.contains(&column_schema.name.as_str()) { + SemanticType::Tag + } else if column_schema.is_time_index() { + SemanticType::Timestamp + } else { + SemanticType::Field + }; + Ok(ColumnMetadata { + column_schema: column_schema.clone(), + semantic_type, + column_id, + }) + }) + .collect::>>() +} + +/// Checks whether the schema invariants hold between the existing and new column metadata. +/// +/// Invariants: +/// - Primary key (Tag) columns must exist in the new metadata, with identical name and ID. +/// - Timestamp column must remain exactly the same in name and ID. +pub(crate) fn check_column_metadata_invariants( + new_column_metadatas: &[ColumnMetadata], + column_metadatas: &[ColumnMetadata], +) -> bool { + let new_primary_keys = new_column_metadatas + .iter() + .filter(|c| c.semantic_type == SemanticType::Tag) + .map(|c| (c.column_schema.name.as_str(), c.column_id)) + .collect::>(); + + let old_primary_keys = column_metadatas + .iter() + .filter(|c| c.semantic_type == SemanticType::Tag) + .map(|c| (c.column_schema.name.as_str(), c.column_id)); + + for (name, id) in old_primary_keys { + if new_primary_keys.get(name) != Some(&id) { + return false; + } + } + + let new_ts_column = new_column_metadatas + .iter() + .find(|c| c.semantic_type == SemanticType::Timestamp) + .map(|c| (c.column_schema.name.as_str(), c.column_id)); + + let old_ts_column = column_metadatas + .iter() + .find(|c| c.semantic_type == SemanticType::Timestamp) + .map(|c| (c.column_schema.name.as_str(), c.column_id)); + + new_ts_column == old_ts_column +} + +/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s. +/// +/// Returns an error if: +/// - Any column is missing in the `name_to_ids`. +/// - The column id in table metadata is not the same as the column id in the column metadata. +/// - The table index is missing in the column metadata. +/// - The primary key or partition key columns are missing in the column metadata. +pub(crate) fn build_table_meta_from_column_metadatas( + table_id: TableId, + table_ref: TableReference, + table_meta: &RawTableMeta, + name_to_ids: &HashMap, + column_metadata: &[ColumnMetadata], +) -> Result { + let column_in_column_metadata = column_metadata + .iter() + .map(|c| (c.column_schema.name.as_str(), c)) + .collect::>(); + let primary_key_names = table_meta + .primary_key_indices + .iter() + .map(|i| table_meta.schema.column_schemas[*i].name.as_str()) + .collect::>(); + let partition_key_names = table_meta + .partition_key_indices + .iter() + .map(|i| table_meta.schema.column_schemas[*i].name.as_str()) + .collect::>(); + ensure!( + column_metadata + .iter() + .any(|c| c.semantic_type == SemanticType::Timestamp), + UnexpectedSnafu { + err_msg: format!( + "Missing table index in column metadata, table: {}, table_id: {}", + table_ref, table_id + ), + } + ); + + // Ensures all primary key and partition key exists in the column metadata. + for column_name in primary_key_names.iter().chain(partition_key_names.iter()) { + let column_in_column_metadata = + column_in_column_metadata + .get(column_name) + .with_context(|| MissingColumnInColumnMetadataSnafu { + column_name: column_name.to_string(), + table_name: table_ref.to_string(), + table_id, + })?; + + let column_id = *name_to_ids + .get(*column_name) + .with_context(|| UnexpectedSnafu { + err_msg: format!("column id not found in name_to_ids: {}", column_name), + })?; + ensure!( + column_id == column_in_column_metadata.column_id, + MismatchColumnIdSnafu { + column_name: column_name.to_string(), + column_id, + table_name: table_ref.to_string(), + table_id, + } + ); + } + + let mut new_raw_table_meta = table_meta.clone(); + let primary_key_indices = &mut new_raw_table_meta.primary_key_indices; + let partition_key_indices = &mut new_raw_table_meta.partition_key_indices; + let value_indices = &mut new_raw_table_meta.value_indices; + let time_index = &mut new_raw_table_meta.schema.timestamp_index; + let columns = &mut new_raw_table_meta.schema.column_schemas; + let column_ids = &mut new_raw_table_meta.column_ids; + + column_ids.clear(); + value_indices.clear(); + columns.clear(); + primary_key_indices.clear(); + partition_key_indices.clear(); + + for (idx, col) in column_metadata.iter().enumerate() { + if partition_key_names.contains(&col.column_schema.name.as_str()) { + partition_key_indices.push(idx); + } + match col.semantic_type { + SemanticType::Tag => { + primary_key_indices.push(idx); + } + SemanticType::Field => { + value_indices.push(idx); + } + SemanticType::Timestamp => { + value_indices.push(idx); + *time_index = Some(idx); + } + } + + columns.push(col.column_schema.clone()); + column_ids.push(col.column_id); + } + + if let Some(time_index) = *time_index { + new_raw_table_meta.schema.column_schemas[time_index].set_time_index(); + } + + Ok(new_raw_table_meta) +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::collections::HashMap; + use std::sync::Arc; + + use api::v1::SemanticType; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder}; + use store_api::metadata::ColumnMetadata; + use table::metadata::{RawTableMeta, TableMetaBuilder}; + use table::table_reference::TableReference; + + use super::*; + use crate::ddl::test_util::region_metadata::build_region_metadata; + use crate::error::Error; + + fn new_test_schema() -> Schema { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), + ]; + SchemaBuilder::try_from(column_schemas) + .unwrap() + .version(123) + .build() + .unwrap() + } + + fn new_test_column_metadatas() -> Vec { + vec![ + ColumnMetadata { + column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 0, + }, + ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }, + ColumnMetadata { + column_schema: ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 2, + }, + ] + } + + fn new_test_raw_table_info() -> RawTableMeta { + let mut table_meta_builder = TableMetaBuilder::empty(); + let table_meta = table_meta_builder + .schema(Arc::new(new_test_schema())) + .primary_key_indices(vec![0]) + .partition_key_indices(vec![2]) + .next_column_id(4) + .build() + .unwrap(); + + table_meta.into() + } + + #[test] + fn test_build_table_info_from_column_metadatas() { + let mut column_metadatas = new_test_column_metadatas(); + column_metadatas.push(ColumnMetadata { + column_schema: ColumnSchema::new("col3", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 3, + }); + + let table_id = 1; + let table_ref = TableReference::full("test_catalog", "test_schema", "test_table"); + let table_meta = new_test_raw_table_info(); + let name_to_ids = HashMap::from([ + ("col1".to_string(), 0), + ("ts".to_string(), 1), + ("col2".to_string(), 2), + ]); + + let new_table_meta = build_table_meta_from_column_metadatas( + table_id, + table_ref, + &table_meta, + &name_to_ids, + &column_metadatas, + ) + .unwrap(); + + assert_eq!(new_table_meta.primary_key_indices, vec![0, 3]); + assert_eq!(new_table_meta.partition_key_indices, vec![2]); + assert_eq!(new_table_meta.value_indices, vec![1, 2]); + assert_eq!(new_table_meta.schema.timestamp_index, Some(1)); + assert_eq!(new_table_meta.column_ids, vec![0, 1, 2, 3]); + } + + #[test] + fn test_build_table_info_from_column_metadatas_with_incorrect_name_to_ids() { + let column_metadatas = new_test_column_metadatas(); + let table_id = 1; + let table_ref = TableReference::full("test_catalog", "test_schema", "test_table"); + let table_meta = new_test_raw_table_info(); + let name_to_ids = HashMap::from([ + ("col1".to_string(), 0), + ("ts".to_string(), 1), + // Change column id of col2 to 3. + ("col2".to_string(), 3), + ]); + + let err = build_table_meta_from_column_metadatas( + table_id, + table_ref, + &table_meta, + &name_to_ids, + &column_metadatas, + ) + .unwrap_err(); + + assert_matches!(err, Error::MismatchColumnId { .. }); + } + + #[test] + fn test_build_table_info_from_column_metadatas_with_missing_time_index() { + let mut column_metadatas = new_test_column_metadatas(); + column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp); + let table_id = 1; + let table_ref = TableReference::full("test_catalog", "test_schema", "test_table"); + let table_meta = new_test_raw_table_info(); + let name_to_ids = HashMap::from([ + ("col1".to_string(), 0), + ("ts".to_string(), 1), + ("col2".to_string(), 2), + ]); + + let err = build_table_meta_from_column_metadatas( + table_id, + table_ref, + &table_meta, + &name_to_ids, + &column_metadatas, + ) + .unwrap_err(); + + assert!( + err.to_string() + .contains("Missing table index in column metadata"), + "err: {}", + err + ); + } + + #[test] + fn test_build_table_info_from_column_metadatas_with_missing_column() { + let mut column_metadatas = new_test_column_metadatas(); + // Remove primary key column. + column_metadatas.retain(|c| c.column_id != 0); + let table_id = 1; + let table_ref = TableReference::full("test_catalog", "test_schema", "test_table"); + let table_meta = new_test_raw_table_info(); + let name_to_ids = HashMap::from([ + ("col1".to_string(), 0), + ("ts".to_string(), 1), + ("col2".to_string(), 2), + ]); + + let err = build_table_meta_from_column_metadatas( + table_id, + table_ref, + &table_meta, + &name_to_ids, + &column_metadatas, + ) + .unwrap_err(); + assert_matches!(err, Error::MissingColumnInColumnMetadata { .. }); + + let mut column_metadatas = new_test_column_metadatas(); + // Remove partition key column. + column_metadatas.retain(|c| c.column_id != 2); + + let err = build_table_meta_from_column_metadatas( + table_id, + table_ref, + &table_meta, + &name_to_ids, + &column_metadatas, + ) + .unwrap_err(); + assert_matches!(err, Error::MissingColumnInColumnMetadata { .. }); + } + + #[test] + fn test_check_column_metadatas_consistent() { + let column_metadatas = new_test_column_metadatas(); + let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas); + let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas); + let result = + check_column_metadatas_consistent(&[region_metadata1, region_metadata2]).unwrap(); + assert_eq!(result, column_metadatas); + + let region_metadata1 = build_region_metadata(RegionId::new(1025, 0), &column_metadatas); + let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas); + let result = check_column_metadatas_consistent(&[region_metadata1, region_metadata2]); + assert!(result.is_none()); + } + + #[test] + fn test_check_column_metadata_invariants() { + let column_metadatas = new_test_column_metadatas(); + let mut new_column_metadatas = column_metadatas.clone(); + new_column_metadatas.push(ColumnMetadata { + column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 3, + }); + assert!(check_column_metadata_invariants( + &new_column_metadatas, + &column_metadatas + )); + } + + #[test] + fn test_check_column_metadata_invariants_missing_primary_key_column_or_ts_column() { + let column_metadatas = new_test_column_metadatas(); + let mut new_column_metadatas = column_metadatas.clone(); + new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp); + assert!(!check_column_metadata_invariants( + &new_column_metadatas, + &column_metadatas + )); + + let column_metadatas = new_test_column_metadatas(); + let mut new_column_metadatas = column_metadatas.clone(); + new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag); + assert!(!check_column_metadata_invariants( + &new_column_metadatas, + &column_metadatas + )); + } + + #[test] + fn test_check_column_metadata_invariants_mismatch_column_id() { + let column_metadatas = new_test_column_metadatas(); + let mut new_column_metadatas = column_metadatas.clone(); + if let Some(col) = new_column_metadatas + .iter_mut() + .find(|c| c.semantic_type == SemanticType::Timestamp) + { + col.column_id = 100; + } + assert!(!check_column_metadata_invariants( + &new_column_metadatas, + &column_metadatas + )); + + let column_metadatas = new_test_column_metadatas(); + let mut new_column_metadatas = column_metadatas.clone(); + if let Some(col) = new_column_metadatas + .iter_mut() + .find(|c| c.semantic_type == SemanticType::Tag) + { + col.column_id = 100; + } + assert!(!check_column_metadata_invariants( + &new_column_metadatas, + &column_metadatas + )); + } + + #[test] + fn test_resolve_column_metadatas_with_use_metasrv_strategy() { + let column_metadatas = new_test_column_metadatas(); + let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas); + let mut metasrv_column_metadatas = region_metadata1.column_metadatas.clone(); + metasrv_column_metadatas.push(ColumnMetadata { + column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 3, + }); + let result = + resolve_column_metadatas_with_metasrv(&metasrv_column_metadatas, &[region_metadata1]) + .unwrap(); + + assert_eq!(result, vec![RegionId::new(1024, 0)]); + } + + #[test] + fn test_resolve_column_metadatas_with_use_latest_strategy() { + let column_metadatas = new_test_column_metadatas(); + let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas); + let mut new_column_metadatas = column_metadatas.clone(); + new_column_metadatas.push(ColumnMetadata { + column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 3, + }); + + let mut region_metadata2 = + build_region_metadata(RegionId::new(1024, 1), &new_column_metadatas); + region_metadata2.schema_version = 2; + + let (resolved_column_metadatas, region_ids) = + resolve_column_metadatas_with_latest(&[region_metadata1, region_metadata2]).unwrap(); + assert_eq!(region_ids, vec![RegionId::new(1024, 0)]); + assert_eq!(resolved_column_metadatas, new_column_metadatas); + } +} diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 6a46d5e74b..fc46ba9c9c 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -1424,6 +1424,6 @@ mod tests { create_table_task.table_info.meta.primary_key_indices, vec![2] ); - assert_eq!(create_table_task.table_info.meta.value_indices, vec![1]); + assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]); } } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 48e041a5df..39083cfec8 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -38,6 +38,7 @@ use crate::requests::{ AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetIndexOptions, TableOptions, UnsetIndexOptions, }; +use crate::table_reference::TableReference; pub type TableId = u32; pub type TableVersion = u64; @@ -1113,7 +1114,8 @@ pub struct RawTableMeta { /// The indices of columns in primary key. Note that the index of timestamp column /// is not included. Order matters to this array. pub primary_key_indices: Vec, - /// The indices of columns in value. Order doesn't matter to this array. + /// The indices of columns in value. The index of timestamp column is included. + /// Order doesn't matter to this array. pub value_indices: Vec, /// Engine type of this table. Usually in small case. pub engine: String, @@ -1219,12 +1221,13 @@ impl RawTableInfo { let mut primary_key_indices = Vec::with_capacity(primary_keys.len()); let mut timestamp_index = None; let mut value_indices = - Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len() - 1); + Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len()); let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len()); for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() { if primary_keys.contains(&column_schema.name) { primary_key_indices.push(index); } else if column_schema.is_time_index() { + value_indices.push(index); timestamp_index = Some(index); } else { value_indices.push(index); @@ -1247,6 +1250,15 @@ impl RawTableInfo { pub fn to_region_options(&self) -> HashMap { HashMap::from(&self.meta.options) } + + /// Returns the table reference. + pub fn table_ref(&self) -> TableReference { + TableReference::full( + self.catalog_name.as_str(), + self.schema_name.as_str(), + self.name.as_str(), + ) + } } impl From for RawTableInfo {