feat: add table reconciliation utilities (#6519)

* feat: add table reconciliation utilities

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestison from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update comment

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-07-17 16:05:38 +08:00
committed by WenyXu
parent c9501053e5
commit c0f40ce8ed
13 changed files with 1105 additions and 5 deletions

View File

@@ -17,6 +17,7 @@ pub mod columns;
pub mod create_table;
pub mod datanode_handler;
pub mod flownode_handler;
pub mod region_metadata;
use std::assert_matches::assert_matches;
use std::collections::HashMap;

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::region::region_request::Body;
use api::v1::region::RegionRequest;
use common_error::ext::{BoxedError, ErrorExt, StackError};
use common_error::status_code::StatusCode;
@@ -22,6 +24,8 @@ use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::debug;
use snafu::{ResultExt, Snafu};
use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use crate::error::{self, Error, Result};
@@ -277,3 +281,47 @@ impl MockDatanodeHandler for AllFailureDatanodeHandler {
unreachable!()
}
}
#[derive(Clone)]
pub struct ListMetadataDatanodeHandler {
pub region_metadatas: HashMap<RegionId, Option<RegionMetadata>>,
}
impl ListMetadataDatanodeHandler {
pub fn new(region_metadatas: HashMap<RegionId, Option<RegionMetadata>>) -> Self {
Self { region_metadatas }
}
}
#[async_trait::async_trait]
impl MockDatanodeHandler for ListMetadataDatanodeHandler {
async fn handle(&self, _peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
let Some(Body::ListMetadata(req)) = request.body else {
unreachable!()
};
let mut response = RegionResponse::new(0);
let mut output = Vec::with_capacity(req.region_ids.len());
for region_id in req.region_ids {
match self.region_metadatas.get(&RegionId::from_u64(region_id)) {
Some(metadata) => {
output.push(metadata.clone());
}
None => {
output.push(None);
}
}
}
response.metadata = serde_json::to_vec(&output).unwrap();
Ok(response)
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}

View File

@@ -0,0 +1,34 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::SemanticType;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
/// Builds a region metadata with the given column metadatas.
pub fn build_region_metadata(
region_id: RegionId,
column_metadatas: &[ColumnMetadata],
) -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(region_id);
let mut primary_key = vec![];
for column_metadata in column_metadatas {
builder.push_column_metadata(column_metadata.clone());
if column_metadata.semantic_type == SemanticType::Tag {
primary_key.push(column_metadata.column_id);
}
}
builder.primary_key(primary_key);
builder.build().unwrap()
}

View File

@@ -13,6 +13,8 @@
// limitations under the License.
pub(crate) mod raw_table_info;
#[allow(dead_code)]
pub(crate) mod region_metadata_lister;
pub(crate) mod table_id;
pub(crate) mod table_info;

View File

@@ -54,7 +54,10 @@ pub(crate) fn build_new_physical_table_info(
}
}
SemanticType::Field => value_indices.push(idx),
SemanticType::Timestamp => *time_index = Some(idx),
SemanticType::Timestamp => {
value_indices.push(idx);
*time_index = Some(idx);
}
}
columns.push(col.column_schema.clone());

View File

@@ -0,0 +1,240 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{ListMetadataRequest, RegionRequest, RegionRequestHeader};
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use snafu::ResultExt;
use store_api::metadata::RegionMetadata;
use store_api::storage::{RegionId, TableId};
use crate::ddl::utils::add_peer_context_if_needed;
use crate::error::{DecodeJsonSnafu, Result};
use crate::node_manager::NodeManagerRef;
use crate::rpc::router::{find_leaders, region_distribution, RegionRoute};
/// Collects the region metadata from the datanodes.
pub struct RegionMetadataLister {
node_manager: NodeManagerRef,
}
impl RegionMetadataLister {
/// Creates a new [`RegionMetadataLister`] with the given [`NodeManagerRef`].
pub fn new(node_manager: NodeManagerRef) -> Self {
Self { node_manager }
}
/// Collects the region metadata from the datanodes.
pub async fn list(
&self,
table_id: TableId,
region_routes: &[RegionRoute],
) -> Result<Vec<Option<RegionMetadata>>> {
let region_distribution = region_distribution(region_routes);
let leaders = find_leaders(region_routes)
.into_iter()
.map(|p| (p.id, p))
.collect::<HashMap<_, _>>();
let total_num_region = region_distribution
.values()
.map(|r| r.leader_regions.len())
.sum::<usize>();
let mut list_metadata_tasks = Vec::with_capacity(leaders.len());
// Build requests.
for (datanode_id, region_role_set) in region_distribution {
if region_role_set.leader_regions.is_empty() {
continue;
}
// Safety: must exists.
let peer = leaders.get(&datanode_id).unwrap();
let requester = self.node_manager.datanode(peer).await;
let region_ids = region_role_set
.leader_regions
.iter()
.map(|r| RegionId::new(table_id, *r).as_u64())
.collect();
let request = Self::build_list_metadata_request(region_ids);
let peer = peer.clone();
list_metadata_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer))
});
}
let results = join_all(list_metadata_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|r| r.metadata);
let mut output = Vec::with_capacity(total_num_region);
for result in results {
let region_metadatas: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&result).context(DecodeJsonSnafu)?;
output.extend(region_metadatas);
}
Ok(output)
}
fn build_list_metadata_request(region_ids: Vec<u64>) -> RegionRequest {
RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(PbRegionRequest::ListMetadata(ListMetadataRequest {
region_ids,
})),
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::meta::Peer;
use api::v1::region::region_request::Body;
use api::v1::region::RegionRequest;
use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;
use tokio::sync::mpsc;
use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, ListMetadataDatanodeHandler};
use crate::ddl::test_util::region_metadata::build_region_metadata;
use crate::ddl::test_util::test_column_metadatas;
use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
use crate::error::Result;
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::MockDatanodeManager;
fn assert_list_metadata_request(req: RegionRequest, expected_region_ids: &[RegionId]) {
let Some(Body::ListMetadata(req)) = req.body else {
unreachable!()
};
assert_eq!(req.region_ids.len(), expected_region_ids.len());
for region_id in expected_region_ids {
assert!(req.region_ids.contains(&region_id.as_u64()));
}
}
fn empty_list_metadata_handler(_peer: Peer, request: RegionRequest) -> Result<RegionResponse> {
let Some(Body::ListMetadata(req)) = request.body else {
unreachable!()
};
let mut output: Vec<Option<RegionMetadata>> = Vec::with_capacity(req.region_ids.len());
for _region_id in req.region_ids {
output.push(None);
}
Ok(RegionResponse::from_metadata(
serde_json::to_vec(&output).unwrap(),
))
}
#[tokio::test]
async fn test_list_request() {
let (tx, mut rx) = mpsc::channel(8);
let handler = DatanodeWatcher::new(tx).with_handler(empty_list_metadata_handler);
let node_manager = Arc::new(MockDatanodeManager::new(handler));
let lister = RegionMetadataLister::new(node_manager);
let region_routes = vec![
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 3)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
},
];
let region_metadatas = lister.list(1024, &region_routes).await.unwrap();
assert_eq!(region_metadatas.len(), 3);
let mut requests = vec![];
for _ in 0..2 {
let (peer, request) = rx.try_recv().unwrap();
requests.push((peer, request));
}
rx.try_recv().unwrap_err();
let (peer, request) = requests.remove(0);
assert_eq!(peer.id, 1);
assert_list_metadata_request(request, &[RegionId::new(1024, 1)]);
let (peer, request) = requests.remove(0);
assert_eq!(peer.id, 3);
assert_list_metadata_request(request, &[RegionId::new(1024, 2), RegionId::new(1024, 3)]);
}
#[tokio::test]
async fn test_list_region_metadata() {
let region_metadata =
build_region_metadata(RegionId::new(1024, 1), &test_column_metadatas(&["tag_0"]));
let region_metadatas = HashMap::from([
(RegionId::new(1024, 0), None),
(RegionId::new(1024, 1), Some(region_metadata.clone())),
]);
let handler = ListMetadataDatanodeHandler::new(region_metadatas);
let node_manager = Arc::new(MockDatanodeManager::new(handler));
let lister = RegionMetadataLister::new(node_manager);
let region_routes = vec![
RegionRoute {
region: Region::new_test(RegionId::new(1024, 0)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
},
];
let region_metadatas = lister.list(1024, &region_routes).await.unwrap();
assert_eq!(region_metadatas.len(), 2);
assert_eq!(region_metadatas[0], None);
assert_eq!(region_metadatas[1], Some(region_metadata));
}
}

View File

@@ -877,6 +877,36 @@ pub enum Error {
#[snafu(source)]
error: object_store::Error,
},
#[snafu(display(
"Missing column in column metadata: {}, table: {}, table_id: {}",
column_name,
table_name,
table_id,
))]
MissingColumnInColumnMetadata {
column_name: String,
#[snafu(implicit)]
location: Location,
table_name: String,
table_id: TableId,
},
#[snafu(display(
"Mismatch column id: column_name: {}, column_id: {}, table: {}, table_id: {}",
column_name,
column_id,
table_name,
table_id,
))]
MismatchColumnId {
column_name: String,
column_id: u32,
#[snafu(implicit)]
location: Location,
table_name: String,
table_id: TableId,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -896,7 +926,10 @@ impl ErrorExt for Error {
| DeserializeFromJson { .. } => StatusCode::Internal,
NoLeader { .. } => StatusCode::TableUnavailable,
ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected,
ValueNotExist { .. }
| ProcedurePoisonConflict { .. }
| MissingColumnInColumnMetadata { .. }
| MismatchColumnId { .. } => StatusCode::Unexpected,
Unsupported { .. } => StatusCode::Unsupported,
WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,

View File

@@ -32,6 +32,7 @@ pub mod key;
pub mod kv_backend;
pub mod leadership_notifier;
pub mod lock_key;
pub mod maintenance;
pub mod metrics;
pub mod node_expiry_listener;
pub mod node_manager;

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod reconcile_table;

View File

@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(weny): Remove it
#[allow(dead_code)]
pub(crate) mod utils;

View File

@@ -0,0 +1,694 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::fmt;
use api::v1::SemanticType;
use datatypes::schema::ColumnSchema;
use snafu::{ensure, OptionExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableMeta;
use table::table_reference::TableReference;
use crate::error::{
MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu,
};
#[derive(Debug, PartialEq, Eq)]
struct PartialRegionMetadata<'a> {
column_metadatas: &'a [ColumnMetadata],
primary_key: &'a [u32],
table_id: TableId,
}
impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> {
fn from(region_metadata: &'a RegionMetadata) -> Self {
Self {
column_metadatas: &region_metadata.column_metadatas,
primary_key: &region_metadata.primary_key,
table_id: region_metadata.region_id.table_id(),
}
}
}
/// A display wrapper for [`ColumnMetadata`] that formats the column metadata in a more readable way.
struct ColumnMetadataDisplay<'a>(pub &'a ColumnMetadata);
impl<'a> fmt::Debug for ColumnMetadataDisplay<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let col = self.0;
write!(
f,
"Column {{ name: {}, id: {}, semantic_type: {:?}, data_type: {:?} }}",
col.column_schema.name, col.column_id, col.semantic_type, col.column_schema.data_type,
)
}
}
/// Checks if the column metadatas are consistent.
///
/// The column metadatas are consistent if:
/// - The column metadatas are the same.
/// - The primary key are the same.
/// - The table id of the region metadatas are the same.
///
/// ## Panic
/// Panic if region_metadatas is empty.
pub(crate) fn check_column_metadatas_consistent(
region_metadatas: &[RegionMetadata],
) -> Option<Vec<ColumnMetadata>> {
let is_column_metadata_consistent = region_metadatas
.windows(2)
.all(|w| PartialRegionMetadata::from(&w[0]) == PartialRegionMetadata::from(&w[1]));
if !is_column_metadata_consistent {
return None;
}
Some(region_metadatas[0].column_metadatas.clone())
}
/// Resolves column metadata inconsistencies among the given region metadatas
/// by using the column metadata from the metasrv as the source of truth.
///
/// All region metadatas whose column metadata differs from the given `column_metadatas`
/// will be marked for reconciliation.
///
/// Returns the region ids that need to be reconciled.
pub(crate) fn resolve_column_metadatas_with_metasrv(
column_metadatas: &[ColumnMetadata],
region_metadatas: &[RegionMetadata],
) -> Result<Vec<RegionId>> {
let is_same_table = region_metadatas
.windows(2)
.all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
ensure!(
is_same_table,
UnexpectedSnafu {
err_msg: "Region metadatas are not from the same table"
}
);
let mut regions_ids = vec![];
for region_metadata in region_metadatas {
if region_metadata.column_metadatas != column_metadatas {
let is_invariant_preserved = check_column_metadata_invariants(
column_metadatas,
&region_metadata.column_metadatas,
);
ensure!(
is_invariant_preserved,
UnexpectedSnafu {
err_msg: format!(
"Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}",
region_metadata.region_id,
column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
)
}
);
regions_ids.push(region_metadata.region_id);
}
}
Ok(regions_ids)
}
/// Resolves column metadata inconsistencies among the given region metadatas
/// by selecting the column metadata with the highest schema version.
///
/// This strategy assumes that at most two versions of column metadata may exist,
/// due to the poison mechanism, making the highest schema version a safe choice.
///
/// Returns the resolved column metadata and the region ids that need to be reconciled.
pub(crate) fn resolve_column_metadatas_with_latest(
region_metadatas: &[RegionMetadata],
) -> Result<(Vec<ColumnMetadata>, Vec<RegionId>)> {
let is_same_table = region_metadatas
.windows(2)
.all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
ensure!(
is_same_table,
UnexpectedSnafu {
err_msg: "Region metadatas are not from the same table"
}
);
let latest_region_metadata = region_metadatas
.iter()
.max_by_key(|c| c.schema_version)
.context(UnexpectedSnafu {
err_msg: "All Region metadatas have the same schema version",
})?;
let latest_column_metadatas = PartialRegionMetadata::from(latest_region_metadata);
let mut region_ids = vec![];
for region_metadata in region_metadatas {
if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas {
let is_invariant_preserved = check_column_metadata_invariants(
&latest_region_metadata.column_metadatas,
&region_metadata.column_metadatas,
);
ensure!(
is_invariant_preserved,
UnexpectedSnafu {
err_msg: format!(
"Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}",
region_metadata.region_id,
latest_column_metadatas.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>()
)
}
);
region_ids.push(region_metadata.region_id);
}
}
// TODO(weny): verify the new column metadatas are acceptable for regions.
Ok((latest_region_metadata.column_metadatas.clone(), region_ids))
}
/// Constructs a vector of [`ColumnMetadata`] from the provided table information.
///
/// This function maps each [`ColumnSchema`] to its corresponding [`ColumnMetadata`] by
/// determining the semantic type (Tag, Timestamp, or Field) and retrieving the column ID
/// from the `name_to_ids` mapping.
///
/// Returns an error if any column name is missing in the mapping.
pub(crate) fn build_column_metadata_from_table_info(
column_schemas: &[ColumnSchema],
primary_key_indexes: &[usize],
name_to_ids: &HashMap<String, u32>,
) -> Result<Vec<ColumnMetadata>> {
let primary_names = primary_key_indexes
.iter()
.map(|i| column_schemas[*i].name.as_str())
.collect::<HashSet<_>>();
column_schemas
.iter()
.map(|column_schema| {
let column_id = *name_to_ids
.get(column_schema.name.as_str())
.with_context(|| UnexpectedSnafu {
err_msg: format!(
"Column name {} not found in name_to_ids",
column_schema.name
),
})?;
let semantic_type = if primary_names.contains(&column_schema.name.as_str()) {
SemanticType::Tag
} else if column_schema.is_time_index() {
SemanticType::Timestamp
} else {
SemanticType::Field
};
Ok(ColumnMetadata {
column_schema: column_schema.clone(),
semantic_type,
column_id,
})
})
.collect::<Result<Vec<_>>>()
}
/// Checks whether the schema invariants hold between the existing and new column metadata.
///
/// Invariants:
/// - Primary key (Tag) columns must exist in the new metadata, with identical name and ID.
/// - Timestamp column must remain exactly the same in name and ID.
pub(crate) fn check_column_metadata_invariants(
new_column_metadatas: &[ColumnMetadata],
column_metadatas: &[ColumnMetadata],
) -> bool {
let new_primary_keys = new_column_metadatas
.iter()
.filter(|c| c.semantic_type == SemanticType::Tag)
.map(|c| (c.column_schema.name.as_str(), c.column_id))
.collect::<HashMap<_, _>>();
let old_primary_keys = column_metadatas
.iter()
.filter(|c| c.semantic_type == SemanticType::Tag)
.map(|c| (c.column_schema.name.as_str(), c.column_id));
for (name, id) in old_primary_keys {
if new_primary_keys.get(name) != Some(&id) {
return false;
}
}
let new_ts_column = new_column_metadatas
.iter()
.find(|c| c.semantic_type == SemanticType::Timestamp)
.map(|c| (c.column_schema.name.as_str(), c.column_id));
let old_ts_column = column_metadatas
.iter()
.find(|c| c.semantic_type == SemanticType::Timestamp)
.map(|c| (c.column_schema.name.as_str(), c.column_id));
new_ts_column == old_ts_column
}
/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
///
/// Returns an error if:
/// - Any column is missing in the `name_to_ids`.
/// - The column id in table metadata is not the same as the column id in the column metadata.
/// - The table index is missing in the column metadata.
/// - The primary key or partition key columns are missing in the column metadata.
pub(crate) fn build_table_meta_from_column_metadatas(
table_id: TableId,
table_ref: TableReference,
table_meta: &RawTableMeta,
name_to_ids: &HashMap<String, u32>,
column_metadata: &[ColumnMetadata],
) -> Result<RawTableMeta> {
let column_in_column_metadata = column_metadata
.iter()
.map(|c| (c.column_schema.name.as_str(), c))
.collect::<HashMap<_, _>>();
let primary_key_names = table_meta
.primary_key_indices
.iter()
.map(|i| table_meta.schema.column_schemas[*i].name.as_str())
.collect::<HashSet<_>>();
let partition_key_names = table_meta
.partition_key_indices
.iter()
.map(|i| table_meta.schema.column_schemas[*i].name.as_str())
.collect::<HashSet<_>>();
ensure!(
column_metadata
.iter()
.any(|c| c.semantic_type == SemanticType::Timestamp),
UnexpectedSnafu {
err_msg: format!(
"Missing table index in column metadata, table: {}, table_id: {}",
table_ref, table_id
),
}
);
// Ensures all primary key and partition key exists in the column metadata.
for column_name in primary_key_names.iter().chain(partition_key_names.iter()) {
let column_in_column_metadata =
column_in_column_metadata
.get(column_name)
.with_context(|| MissingColumnInColumnMetadataSnafu {
column_name: column_name.to_string(),
table_name: table_ref.to_string(),
table_id,
})?;
let column_id = *name_to_ids
.get(*column_name)
.with_context(|| UnexpectedSnafu {
err_msg: format!("column id not found in name_to_ids: {}", column_name),
})?;
ensure!(
column_id == column_in_column_metadata.column_id,
MismatchColumnIdSnafu {
column_name: column_name.to_string(),
column_id,
table_name: table_ref.to_string(),
table_id,
}
);
}
let mut new_raw_table_meta = table_meta.clone();
let primary_key_indices = &mut new_raw_table_meta.primary_key_indices;
let partition_key_indices = &mut new_raw_table_meta.partition_key_indices;
let value_indices = &mut new_raw_table_meta.value_indices;
let time_index = &mut new_raw_table_meta.schema.timestamp_index;
let columns = &mut new_raw_table_meta.schema.column_schemas;
let column_ids = &mut new_raw_table_meta.column_ids;
column_ids.clear();
value_indices.clear();
columns.clear();
primary_key_indices.clear();
partition_key_indices.clear();
for (idx, col) in column_metadata.iter().enumerate() {
if partition_key_names.contains(&col.column_schema.name.as_str()) {
partition_key_indices.push(idx);
}
match col.semantic_type {
SemanticType::Tag => {
primary_key_indices.push(idx);
}
SemanticType::Field => {
value_indices.push(idx);
}
SemanticType::Timestamp => {
value_indices.push(idx);
*time_index = Some(idx);
}
}
columns.push(col.column_schema.clone());
column_ids.push(col.column_id);
}
if let Some(time_index) = *time_index {
new_raw_table_meta.schema.column_schemas[time_index].set_time_index();
}
Ok(new_raw_table_meta)
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
use store_api::metadata::ColumnMetadata;
use table::metadata::{RawTableMeta, TableMetaBuilder};
use table::table_reference::TableReference;
use super::*;
use crate::ddl::test_util::region_metadata::build_region_metadata;
use crate::error::Error;
fn new_test_schema() -> Schema {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
];
SchemaBuilder::try_from(column_schemas)
.unwrap()
.version(123)
.build()
.unwrap()
}
fn new_test_column_metadatas() -> Vec<ColumnMetadata> {
vec![
ColumnMetadata {
column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 0,
},
ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
semantic_type: SemanticType::Timestamp,
column_id: 1,
},
ColumnMetadata {
column_schema: ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 2,
},
]
}
fn new_test_raw_table_info() -> RawTableMeta {
let mut table_meta_builder = TableMetaBuilder::empty();
let table_meta = table_meta_builder
.schema(Arc::new(new_test_schema()))
.primary_key_indices(vec![0])
.partition_key_indices(vec![2])
.next_column_id(4)
.build()
.unwrap();
table_meta.into()
}
#[test]
fn test_build_table_info_from_column_metadatas() {
let mut column_metadatas = new_test_column_metadatas();
column_metadatas.push(ColumnMetadata {
column_schema: ColumnSchema::new("col3", ConcreteDataType::string_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 3,
});
let table_id = 1;
let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
let table_meta = new_test_raw_table_info();
let name_to_ids = HashMap::from([
("col1".to_string(), 0),
("ts".to_string(), 1),
("col2".to_string(), 2),
]);
let new_table_meta = build_table_meta_from_column_metadatas(
table_id,
table_ref,
&table_meta,
&name_to_ids,
&column_metadatas,
)
.unwrap();
assert_eq!(new_table_meta.primary_key_indices, vec![0, 3]);
assert_eq!(new_table_meta.partition_key_indices, vec![2]);
assert_eq!(new_table_meta.value_indices, vec![1, 2]);
assert_eq!(new_table_meta.schema.timestamp_index, Some(1));
assert_eq!(new_table_meta.column_ids, vec![0, 1, 2, 3]);
}
#[test]
fn test_build_table_info_from_column_metadatas_with_incorrect_name_to_ids() {
let column_metadatas = new_test_column_metadatas();
let table_id = 1;
let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
let table_meta = new_test_raw_table_info();
let name_to_ids = HashMap::from([
("col1".to_string(), 0),
("ts".to_string(), 1),
// Change column id of col2 to 3.
("col2".to_string(), 3),
]);
let err = build_table_meta_from_column_metadatas(
table_id,
table_ref,
&table_meta,
&name_to_ids,
&column_metadatas,
)
.unwrap_err();
assert_matches!(err, Error::MismatchColumnId { .. });
}
#[test]
fn test_build_table_info_from_column_metadatas_with_missing_time_index() {
let mut column_metadatas = new_test_column_metadatas();
column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
let table_id = 1;
let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
let table_meta = new_test_raw_table_info();
let name_to_ids = HashMap::from([
("col1".to_string(), 0),
("ts".to_string(), 1),
("col2".to_string(), 2),
]);
let err = build_table_meta_from_column_metadatas(
table_id,
table_ref,
&table_meta,
&name_to_ids,
&column_metadatas,
)
.unwrap_err();
assert!(
err.to_string()
.contains("Missing table index in column metadata"),
"err: {}",
err
);
}
#[test]
fn test_build_table_info_from_column_metadatas_with_missing_column() {
let mut column_metadatas = new_test_column_metadatas();
// Remove primary key column.
column_metadatas.retain(|c| c.column_id != 0);
let table_id = 1;
let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
let table_meta = new_test_raw_table_info();
let name_to_ids = HashMap::from([
("col1".to_string(), 0),
("ts".to_string(), 1),
("col2".to_string(), 2),
]);
let err = build_table_meta_from_column_metadatas(
table_id,
table_ref,
&table_meta,
&name_to_ids,
&column_metadatas,
)
.unwrap_err();
assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
let mut column_metadatas = new_test_column_metadatas();
// Remove partition key column.
column_metadatas.retain(|c| c.column_id != 2);
let err = build_table_meta_from_column_metadatas(
table_id,
table_ref,
&table_meta,
&name_to_ids,
&column_metadatas,
)
.unwrap_err();
assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
}
#[test]
fn test_check_column_metadatas_consistent() {
let column_metadatas = new_test_column_metadatas();
let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
let result =
check_column_metadatas_consistent(&[region_metadata1, region_metadata2]).unwrap();
assert_eq!(result, column_metadatas);
let region_metadata1 = build_region_metadata(RegionId::new(1025, 0), &column_metadatas);
let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
let result = check_column_metadatas_consistent(&[region_metadata1, region_metadata2]);
assert!(result.is_none());
}
#[test]
fn test_check_column_metadata_invariants() {
let column_metadatas = new_test_column_metadatas();
let mut new_column_metadatas = column_metadatas.clone();
new_column_metadatas.push(ColumnMetadata {
column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 3,
});
assert!(check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
}
#[test]
fn test_check_column_metadata_invariants_missing_primary_key_column_or_ts_column() {
let column_metadatas = new_test_column_metadatas();
let mut new_column_metadatas = column_metadatas.clone();
new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
assert!(!check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
let column_metadatas = new_test_column_metadatas();
let mut new_column_metadatas = column_metadatas.clone();
new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag);
assert!(!check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
}
#[test]
fn test_check_column_metadata_invariants_mismatch_column_id() {
let column_metadatas = new_test_column_metadatas();
let mut new_column_metadatas = column_metadatas.clone();
if let Some(col) = new_column_metadatas
.iter_mut()
.find(|c| c.semantic_type == SemanticType::Timestamp)
{
col.column_id = 100;
}
assert!(!check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
let column_metadatas = new_test_column_metadatas();
let mut new_column_metadatas = column_metadatas.clone();
if let Some(col) = new_column_metadatas
.iter_mut()
.find(|c| c.semantic_type == SemanticType::Tag)
{
col.column_id = 100;
}
assert!(!check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
}
#[test]
fn test_resolve_column_metadatas_with_use_metasrv_strategy() {
let column_metadatas = new_test_column_metadatas();
let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
let mut metasrv_column_metadatas = region_metadata1.column_metadatas.clone();
metasrv_column_metadatas.push(ColumnMetadata {
column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 3,
});
let result =
resolve_column_metadatas_with_metasrv(&metasrv_column_metadatas, &[region_metadata1])
.unwrap();
assert_eq!(result, vec![RegionId::new(1024, 0)]);
}
#[test]
fn test_resolve_column_metadatas_with_use_latest_strategy() {
let column_metadatas = new_test_column_metadatas();
let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
let mut new_column_metadatas = column_metadatas.clone();
new_column_metadatas.push(ColumnMetadata {
column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 3,
});
let mut region_metadata2 =
build_region_metadata(RegionId::new(1024, 1), &new_column_metadatas);
region_metadata2.schema_version = 2;
let (resolved_column_metadatas, region_ids) =
resolve_column_metadatas_with_latest(&[region_metadata1, region_metadata2]).unwrap();
assert_eq!(region_ids, vec![RegionId::new(1024, 0)]);
assert_eq!(resolved_column_metadatas, new_column_metadatas);
}
}

View File

@@ -1424,6 +1424,6 @@ mod tests {
create_table_task.table_info.meta.primary_key_indices,
vec![2]
);
assert_eq!(create_table_task.table_info.meta.value_indices, vec![1]);
assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
}
}

View File

@@ -38,6 +38,7 @@ use crate::requests::{
AddColumnRequest, AlterKind, ModifyColumnTypeRequest, SetIndexOptions, TableOptions,
UnsetIndexOptions,
};
use crate::table_reference::TableReference;
pub type TableId = u32;
pub type TableVersion = u64;
@@ -1113,7 +1114,8 @@ pub struct RawTableMeta {
/// The indices of columns in primary key. Note that the index of timestamp column
/// is not included. Order matters to this array.
pub primary_key_indices: Vec<usize>,
/// The indices of columns in value. Order doesn't matter to this array.
/// The indices of columns in value. The index of timestamp column is included.
/// Order doesn't matter to this array.
pub value_indices: Vec<usize>,
/// Engine type of this table. Usually in small case.
pub engine: String,
@@ -1219,12 +1221,13 @@ impl RawTableInfo {
let mut primary_key_indices = Vec::with_capacity(primary_keys.len());
let mut timestamp_index = None;
let mut value_indices =
Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len() - 1);
Vec::with_capacity(self.meta.schema.column_schemas.len() - primary_keys.len());
let mut column_ids = Vec::with_capacity(self.meta.schema.column_schemas.len());
for (index, column_schema) in self.meta.schema.column_schemas.iter().enumerate() {
if primary_keys.contains(&column_schema.name) {
primary_key_indices.push(index);
} else if column_schema.is_time_index() {
value_indices.push(index);
timestamp_index = Some(index);
} else {
value_indices.push(index);
@@ -1247,6 +1250,15 @@ impl RawTableInfo {
pub fn to_region_options(&self) -> HashMap<String, String> {
HashMap::from(&self.meta.options)
}
/// Returns the table reference.
pub fn table_ref(&self) -> TableReference {
TableReference::full(
self.catalog_name.as_str(),
self.schema_name.as_str(),
self.name.as_str(),
)
}
}
impl From<TableInfo> for RawTableInfo {