feat: introduce reconcile table procedure (#6584)

* feat: introduce `SyncColumns`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: introduce reconcile table procedure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggesions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-07-30 12:42:38 +08:00
committed by GitHub
parent d9d1773913
commit ac8493ab4a
24 changed files with 1288 additions and 91 deletions

2
Cargo.lock generated
View File

@@ -5228,7 +5228,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3bb33593a781504e025e6315572bc5dfdc1dc497#3bb33593a781504e025e6315572bc5dfdc1dc497"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fe8c13f5f3c1fbef63f57fbdd29f0490dfeb987b#fe8c13f5f3c1fbef63f57fbdd29f0490dfeb987b"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -140,7 +140,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3bb33593a781504e025e6315572bc5dfdc1dc497" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fe8c13f5f3c1fbef63f57fbdd29f0490dfeb987b" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -24,7 +24,7 @@ use greptime_proto::v1::{
};
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result};
use crate::helper::ColumnDataTypeWrapper;
use crate::v1::{ColumnDef, ColumnOptions, SemanticType};
@@ -77,6 +77,48 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
})
}
/// Tries to construct a `ColumnDef` from the given `ColumnSchema`.
///
/// TODO(weny): Add tests for this function.
pub fn try_as_column_def(column_schema: &ColumnSchema, is_primary_key: bool) -> Result<ColumnDef> {
let column_datatype =
ColumnDataTypeWrapper::try_from(column_schema.data_type.clone()).map(|w| w.to_parts())?;
let semantic_type = if column_schema.is_time_index() {
SemanticType::Timestamp
} else if is_primary_key {
SemanticType::Tag
} else {
SemanticType::Field
} as i32;
let comment = column_schema
.metadata()
.get(COMMENT_KEY)
.cloned()
.unwrap_or_default();
let default_constraint = match column_schema.default_constraint() {
None => vec![],
Some(v) => v
.clone()
.try_into()
.context(ConvertColumnDefaultConstraintSnafu {
column: &column_schema.name,
})?,
};
let options = options_from_column_schema(column_schema);
Ok(ColumnDef {
name: column_schema.name.clone(),
data_type: column_datatype.0 as i32,
is_nullable: column_schema.is_nullable(),
default_constraint,
semantic_type,
comment,
datatype_extension: column_datatype.1,
options,
})
}
/// Constructs a `ColumnOptions` from the given `ColumnSchema`.
pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<ColumnOptions> {
let mut options = ColumnOptions::default();

View File

@@ -13,12 +13,12 @@
// limitations under the License.
use common_grpc_expr::alter_expr_to_request;
use itertools::Itertools;
use snafu::ResultExt;
use table::metadata::{RawTableInfo, TableInfo};
use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::utils::table_info::batch_update_table_info_values;
use crate::error;
use crate::error::{ConvertAlterTableRequestSnafu, Result};
use crate::key::table_info::TableInfoValue;
@@ -48,25 +48,8 @@ impl AlterLogicalTablesProcedure {
pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> {
let table_info_values = self.build_update_metadata()?;
let manager = &self.context.table_metadata_manager;
let chunk_size = manager.batch_update_table_info_value_chunk_size();
if table_info_values.len() > chunk_size {
let chunks = table_info_values
.into_iter()
.chunks(chunk_size)
.into_iter()
.map(|check| check.collect::<Vec<_>>())
.collect::<Vec<_>>();
for chunk in chunks {
manager.batch_update_table_info_values(chunk).await?;
}
} else {
manager
.batch_update_table_info_values(table_info_values)
.await?;
}
Ok(())
batch_update_table_info_values(&self.context.table_metadata_manager, table_info_values)
.await
}
pub(crate) fn build_update_metadata(

View File

@@ -21,7 +21,9 @@ use store_api::storage::TableId;
use table::table_reference::TableReference;
use crate::ddl::utils::table_id::get_all_table_ids_by_names;
use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids;
use crate::ddl::utils::table_info::{
all_logical_table_routes_have_same_physical_id, get_all_table_info_values_by_table_ids,
};
use crate::error::{
AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu,
TableRouteNotFoundSnafu,
@@ -146,23 +148,16 @@ impl<'a> AlterLogicalTableValidator<'a> {
table_route_manager: &TableRouteManager,
table_ids: &[TableId],
) -> Result<()> {
let table_routes = table_route_manager
.table_route_storage()
.batch_get(table_ids)
let all_logical_table_routes_have_same_physical_id =
all_logical_table_routes_have_same_physical_id(
table_route_manager,
table_ids,
self.physical_table_id,
)
.await?;
let physical_table_id = self.physical_table_id;
let is_same_physical_table = table_routes.iter().all(|r| {
if let Some(TableRouteValue::Logical(r)) = r {
r.physical_table_id() == physical_table_id
} else {
false
}
});
ensure!(
is_same_physical_table,
all_logical_table_routes_have_same_physical_id,
AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: "All the tasks should have the same physical table id"
}

View File

@@ -309,6 +309,5 @@ fn build_new_table_info(
"Built new table info: {:?} for table {}, table_id: {}",
new_info.meta, table_name, table_id
);
Ok(new_info)
}

View File

@@ -21,8 +21,7 @@ use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::error;
use crate::error::Result;
use crate::error::{self, Result};
use crate::wal_options_allocator::prepare_wal_options;
pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {

View File

@@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use itertools::Itertools;
use snafu::OptionExt;
use store_api::storage::TableId;
use table::metadata::RawTableInfo;
use table::table_reference::TableReference;
use crate::error::{Result, TableInfoNotFoundSnafu};
use crate::key::table_info::{TableInfoManager, TableInfoValue};
use crate::key::DeserializedValueWithBytes;
use crate::key::table_route::{TableRouteManager, TableRouteValue};
use crate::key::{DeserializedValueWithBytes, TableMetadataManager};
/// Get all table info values by table ids.
///
@@ -42,3 +45,56 @@ pub(crate) async fn get_all_table_info_values_by_table_ids<'a>(
Ok(table_info_values)
}
/// Checks if all the logical table routes have the same physical table id.
pub(crate) async fn all_logical_table_routes_have_same_physical_id(
table_route_manager: &TableRouteManager,
table_ids: &[TableId],
physical_table_id: TableId,
) -> Result<bool> {
let table_routes = table_route_manager
.table_route_storage()
.batch_get(table_ids)
.await?;
let is_same_physical_table = table_routes.iter().all(|r| {
if let Some(TableRouteValue::Logical(r)) = r {
r.physical_table_id() == physical_table_id
} else {
false
}
});
Ok(is_same_physical_table)
}
/// Batch updates the table info values.
///
/// The table info values are grouped into chunks, and each chunk is updated in a single transaction.
///
/// Returns an error if any table info value fails to update.
pub(crate) async fn batch_update_table_info_values(
table_metadata_manager: &TableMetadataManager,
table_info_values: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
) -> Result<()> {
let chunk_size = table_metadata_manager.batch_update_table_info_value_chunk_size();
if table_info_values.len() > chunk_size {
let chunks = table_info_values
.into_iter()
.chunks(chunk_size)
.into_iter()
.map(|check| check.collect::<Vec<_>>())
.collect::<Vec<_>>();
for chunk in chunks {
table_metadata_manager
.batch_update_table_info_values(chunk)
.await?;
}
} else {
table_metadata_manager
.batch_update_table_info_values(table_info_values)
.await?;
}
Ok(())
}

View File

@@ -878,6 +878,12 @@ pub enum Error {
error: object_store::Error,
},
#[snafu(display("Missing column ids"))]
MissingColumnIds {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Missing column in column metadata: {}, table: {}, table_id: {}",
column_name,
@@ -907,6 +913,24 @@ pub enum Error {
table_name: String,
table_id: TableId,
},
#[snafu(display("Failed to convert column def, column: {}", column))]
ConvertColumnDef {
column: String,
#[snafu(implicit)]
location: Location,
source: api::error::Error,
},
#[snafu(display(
"Column metadata inconsistencies found in table: {}, table_id: {}",
table_name,
table_id
))]
ColumnMetadataConflicts {
table_name: String,
table_id: TableId,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -928,8 +952,10 @@ impl ErrorExt for Error {
NoLeader { .. } => StatusCode::TableUnavailable,
ValueNotExist { .. }
| ProcedurePoisonConflict { .. }
| MissingColumnIds { .. }
| MissingColumnInColumnMetadata { .. }
| MismatchColumnId { .. } => StatusCode::Unexpected,
| MismatchColumnId { .. }
| ColumnMetadataConflicts { .. } => StatusCode::Unexpected,
Unsupported { .. } => StatusCode::Unsupported,
WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,
@@ -1013,6 +1039,7 @@ impl ErrorExt for Error {
AbortProcedure { source, .. } => source.status_code(),
ConvertAlterTableRequest { source, .. } => source.status_code(),
PutPoison { source, .. } => source.status_code(),
ConvertColumnDef { source, .. } => source.status_code(),
ParseProcedureId { .. }
| InvalidNumTopics { .. }

View File

@@ -32,13 +32,13 @@ pub mod key;
pub mod kv_backend;
pub mod leadership_notifier;
pub mod lock_key;
pub mod maintenance;
pub mod metrics;
pub mod node_expiry_listener;
pub mod node_manager;
pub mod peer;
pub mod poison_key;
pub mod range_stream;
pub mod reconciliation;
pub mod region_keeper;
pub mod region_registry;
pub mod rpc;

View File

@@ -1,15 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod reconcile_table;

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(weny): Remove it
#[allow(dead_code)]
pub(crate) mod reconcile_table;
// TODO(weny): Remove it
#[allow(dead_code)]
pub(crate) mod utils;

View File

@@ -0,0 +1,238 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod reconcile_regions;
pub(crate) mod reconciliation_end;
pub(crate) mod reconciliation_start;
pub(crate) mod resolve_column_metadata;
pub(crate) mod update_table_info;
use std::any::Any;
use std::fmt::Debug;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::storage::TableId;
use table::metadata::RawTableMeta;
use table::table_name::TableName;
use tonic::async_trait;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::error::Result;
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_table::reconciliation_start::ReconciliationStart;
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::utils::{build_table_meta_from_column_metadatas, Context};
pub struct ReconcileTableContext {
pub node_manager: NodeManagerRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
pub persistent_ctx: PersistentContext,
pub volatile_ctx: VolatileContext,
}
impl ReconcileTableContext {
/// Creates a new [`ReconcileTableContext`] with the given [`Context`] and [`PersistentContext`].
pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self {
Self {
node_manager: ctx.node_manager,
table_metadata_manager: ctx.table_metadata_manager,
cache_invalidator: ctx.cache_invalidator,
persistent_ctx,
volatile_ctx: VolatileContext::default(),
}
}
pub(crate) fn table_name(&self) -> &TableName {
&self.persistent_ctx.table_name
}
pub(crate) fn table_id(&self) -> TableId {
self.persistent_ctx.table_id
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct PersistentContext {
pub(crate) table_id: TableId,
pub(crate) table_name: TableName,
pub(crate) resolve_strategy: ResolveStrategy,
/// The table info value.
/// The value will be set in `ReconciliationStart` state.
pub(crate) table_info_value: Option<DeserializedValueWithBytes<TableInfoValue>>,
// The physical table route.
// The value will be set in `ReconciliationStart` state.
pub(crate) physical_table_route: Option<PhysicalTableRouteValue>,
}
impl PersistentContext {
pub(crate) fn new(
table_id: TableId,
table_name: TableName,
resolve_strategy: ResolveStrategy,
) -> Self {
Self {
table_id,
table_name,
resolve_strategy,
table_info_value: None,
physical_table_route: None,
}
}
}
#[derive(Default)]
pub(crate) struct VolatileContext {
pub(crate) table_meta: Option<RawTableMeta>,
}
impl ReconcileTableContext {
/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
pub(crate) fn build_table_meta(
&self,
column_metadatas: &[ColumnMetadata],
) -> Result<RawTableMeta> {
// Safety: The table info value is set in `ReconciliationStart` state.
let table_info_value = self.persistent_ctx.table_info_value.as_ref().unwrap();
let table_id = self.table_id();
let table_ref = self.table_name().table_ref();
let name_to_ids = table_info_value.table_info.name_to_ids();
let table_meta = build_table_meta_from_column_metadatas(
table_id,
table_ref,
&table_info_value.table_info.meta,
name_to_ids,
column_metadatas,
)?;
Ok(table_meta)
}
}
pub struct ReconcileTableProcedure {
pub context: ReconcileTableContext,
state: Box<dyn State>,
}
impl ReconcileTableProcedure {
/// Creates a new [`ReconcileTableProcedure`] with the given [`Context`] and [`PersistentContext`].
pub fn new(
ctx: Context,
table_id: TableId,
table_name: TableName,
resolve_strategy: ResolveStrategy,
) -> Self {
let persistent_ctx = PersistentContext::new(table_id, table_name, resolve_strategy);
let context = ReconcileTableContext::new(ctx, persistent_ctx);
let state = Box::new(ReconciliationStart);
Self { context, state }
}
}
impl ReconcileTableProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileTable";
pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult<Self> {
let ProcedureDataOwned {
state,
persistent_ctx,
} = serde_json::from_str(json).context(FromJsonSnafu)?;
let context = ReconcileTableContext::new(ctx, persistent_ctx);
Ok(Self { context, state })
}
}
#[derive(Debug, Serialize)]
struct ProcedureData<'a> {
state: &'a dyn State,
persistent_ctx: &'a PersistentContext,
}
#[derive(Debug, Deserialize)]
struct ProcedureDataOwned {
state: Box<dyn State>,
persistent_ctx: PersistentContext,
}
#[async_trait]
impl Procedure for ReconcileTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
Ok(status)
}
Err(e) => {
if e.is_retry_later() {
Err(ProcedureError::retry_later(e))
} else {
Err(ProcedureError::external(e))
}
}
}
}
fn dump(&self) -> ProcedureResult<String> {
let data = ProcedureData {
state: self.state.as_ref(),
persistent_ctx: &self.context.persistent_ctx,
};
serde_json::to_string(&data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let table_ref = &self.context.table_name().table_ref();
LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),
TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(),
])
}
}
#[async_trait::async_trait]
#[typetag::serde(tag = "reconcile_table_state")]
pub(crate) trait State: Sync + Send + Debug {
fn name(&self) -> &'static str {
let type_name = std::any::type_name::<Self>();
// short name
type_name.split("::").last().unwrap_or(type_name)
}
async fn next(
&mut self,
ctx: &mut ReconcileTableContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)>;
fn as_any(&self) -> &dyn Any;
}

View File

@@ -0,0 +1,199 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::{HashMap, HashSet};
use api::v1::column_def::try_as_column_def;
use api::v1::region::region_request::Body;
use api::v1::region::{
alter_request, AlterRequest, RegionColumnDef, RegionRequest, RegionRequestHeader, SyncColumns,
};
use api::v1::{ColumnDef, SemanticType};
use async_trait::async_trait;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures::future;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use store_api::storage::{ColumnId, RegionId};
use crate::ddl::utils::{add_peer_context_if_needed, extract_column_metadatas};
use crate::error::{ConvertColumnDefSnafu, Result, UnexpectedSnafu};
use crate::reconciliation::reconcile_table::reconciliation_end::ReconciliationEnd;
use crate::reconciliation::reconcile_table::update_table_info::UpdateTableInfo;
use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
use crate::rpc::router::{find_leaders, region_distribution};
#[derive(Debug, Serialize, Deserialize)]
pub struct ReconcileRegions {
column_metadatas: Vec<ColumnMetadata>,
region_ids: HashSet<RegionId>,
}
impl ReconcileRegions {
pub fn new(column_metadatas: Vec<ColumnMetadata>, region_ids: Vec<RegionId>) -> Self {
Self {
column_metadatas,
region_ids: region_ids.into_iter().collect(),
}
}
}
#[async_trait]
#[typetag::serde]
impl State for ReconcileRegions {
async fn next(
&mut self,
ctx: &mut ReconcileTableContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_meta = ctx.build_table_meta(&self.column_metadatas)?;
ctx.volatile_ctx.table_meta = Some(table_meta);
let table_id = ctx.table_id();
let table_name = ctx.table_name();
let primary_keys = self
.column_metadatas
.iter()
.filter(|c| c.semantic_type == SemanticType::Tag)
.map(|c| c.column_schema.name.to_string())
.collect::<HashSet<_>>();
let column_defs = self
.column_metadatas
.iter()
.map(|c| {
let column_def = try_as_column_def(
&c.column_schema,
primary_keys.contains(&c.column_schema.name),
)
.context(ConvertColumnDefSnafu {
column: &c.column_schema.name,
})?;
Ok((c.column_id, column_def))
})
.collect::<Result<Vec<_>>>()?;
// Sends sync column metadatas to datanode.
// Safety: The physical table route is set in `ReconciliationStart` state.
let region_routes = &ctx
.persistent_ctx
.physical_table_route
.as_ref()
.unwrap()
.region_routes;
let region_distribution = region_distribution(region_routes);
let leaders = find_leaders(region_routes)
.into_iter()
.map(|p| (p.id, p))
.collect::<HashMap<_, _>>();
let mut sync_column_tsks = Vec::with_capacity(self.region_ids.len());
for (datanode_id, region_role_set) in region_distribution {
if region_role_set.leader_regions.is_empty() {
continue;
}
// Safety: It contains all leaders in the region routes.
let peer = leaders.get(&datanode_id).unwrap();
for region_id in region_role_set.leader_regions {
let region_id = RegionId::new(ctx.persistent_ctx.table_id, region_id);
if self.region_ids.contains(&region_id) {
let requester = ctx.node_manager.datanode(peer).await;
let request = make_alter_region_request(region_id, &column_defs);
let peer = peer.clone();
sync_column_tsks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer))
});
}
}
}
let mut results = future::join_all(sync_column_tsks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
// Ensures all the column metadatas are the same.
let column_metadatas =
extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?.context(
UnexpectedSnafu {
err_msg: format!(
"The table column metadata schemas from datanodes are not the same, table: {}, table_id: {}",
table_name,
table_id
),
},
)?;
// Checks all column metadatas are consistent, and updates the table info if needed.
if column_metadatas != self.column_metadatas {
info!("Datanode column metadatas are not consistent with metasrv, updating metasrv's column metadatas, table: {}, table_id: {}", table_name, table_id);
// Safety: fetched in the above.
let table_info_value = ctx.persistent_ctx.table_info_value.clone().unwrap();
return Ok((
Box::new(UpdateTableInfo::new(table_info_value, column_metadatas)),
Status::executing(true),
));
}
Ok((Box::new(ReconciliationEnd), Status::executing(false)))
}
fn as_any(&self) -> &dyn Any {
self
}
}
/// Makes an alter region request to sync columns.
fn make_alter_region_request(
region_id: RegionId,
column_defs: &[(ColumnId, ColumnDef)],
) -> RegionRequest {
let kind = alter_request::Kind::SyncColumns(to_region_sync_columns(column_defs));
let alter_request = AlterRequest {
region_id: region_id.as_u64(),
schema_version: 0,
kind: Some(kind),
};
RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(Body::Alter(alter_request)),
}
}
fn to_region_sync_columns(column_defs: &[(ColumnId, ColumnDef)]) -> SyncColumns {
let region_column_defs = column_defs
.iter()
.map(|(column_id, column_def)| RegionColumnDef {
column_id: *column_id,
column_def: Some(column_def.clone()),
})
.collect::<Vec<_>>();
SyncColumns {
column_defs: region_column_defs,
}
}

View File

@@ -0,0 +1,43 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use tonic::async_trait;
use crate::error::Result;
use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
/// The state of the reconciliation end.
/// This state is used to indicate that the reconciliation is done.
#[derive(Debug, Serialize, Deserialize)]
pub struct ReconciliationEnd;
#[async_trait]
#[typetag::serde]
impl State for ReconciliationEnd {
async fn next(
&mut self,
_ctx: &mut ReconcileTableContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(ReconciliationEnd), Status::done()))
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,108 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
use crate::error::{self, Result, UnexpectedSnafu};
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveColumnMetadata;
use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
/// The start state of the reconciliation procedure.
///
/// This state is used to prepare the table for reconciliation.
/// It will:
/// 1. Check the table id and table name consistency.
/// 2. Ensures the table is a physical table.
/// 3. List the region metadatas for the physical table.
#[derive(Debug, Serialize, Deserialize)]
pub struct ReconciliationStart;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ReconciliationStart {
async fn next(
&mut self,
ctx: &mut ReconcileTableContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.table_id();
let table_name = ctx.table_name();
let (physical_table_id, physical_table_route) = ctx
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await?;
ensure!(
physical_table_id == table_id,
error::UnexpectedSnafu {
err_msg: format!(
"Reconcile table only works for physical table, but got logical table: {}, table_id: {}",
table_name, table_id
),
}
);
info!("Reconciling table: {}, table_id: {}", table_name, table_id);
// TODO(weny): Repairs the table route if needed.
let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
// Always list region metadatas for the physical table.
let region_metadatas = region_metadata_lister
.list(physical_table_id, &physical_table_route.region_routes)
.await?;
ensure!(
!region_metadatas.is_empty(),
error::UnexpectedSnafu {
err_msg: format!(
"No region metadata found for table: {}, table_id: {}",
table_name, table_id
),
}
);
if region_metadatas.iter().any(|r| r.is_none()) {
return UnexpectedSnafu {
err_msg: format!(
"Some regions are not opened, table: {}, table_id: {}",
table_name, table_id
),
}
.fail();
}
// Persist the physical table route.
// TODO(weny): refetch the physical table route if repair is needed.
ctx.persistent_ctx.physical_table_route = Some(physical_table_route);
let region_metadatas = region_metadatas.into_iter().map(|r| r.unwrap()).collect();
Ok((
Box::new(ResolveColumnMetadata::new(
ctx.persistent_ctx.resolve_strategy,
region_metadatas,
)),
// We don't persist the state of this step.
Status::executing(false),
))
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,135 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use async_trait::async_trait;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use store_api::metadata::RegionMetadata;
use crate::error::{self, MissingColumnIdsSnafu, Result};
use crate::reconciliation::reconcile_table::reconcile_regions::ReconcileRegions;
use crate::reconciliation::reconcile_table::update_table_info::UpdateTableInfo;
use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
use crate::reconciliation::utils::{
build_column_metadata_from_table_info, check_column_metadatas_consistent,
resolve_column_metadatas_with_latest, resolve_column_metadatas_with_metasrv,
};
/// Strategy for resolving column metadata inconsistencies.
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
pub(crate) enum ResolveStrategy {
/// Always uses the column metadata from metasrv.
UseMetasrv,
/// Trusts the latest column metadata from datanode.
UseLatest,
/// Aborts the resolution process if inconsistencies are detected.
AbortOnConflict,
}
/// State responsible for resolving inconsistencies in column metadata across physical regions.
#[derive(Debug, Serialize, Deserialize)]
pub struct ResolveColumnMetadata {
strategy: ResolveStrategy,
region_metadata: Vec<RegionMetadata>,
}
impl ResolveColumnMetadata {
pub fn new(strategy: ResolveStrategy, region_metadata: Vec<RegionMetadata>) -> Self {
Self {
strategy,
region_metadata,
}
}
}
#[async_trait]
#[typetag::serde]
impl State for ResolveColumnMetadata {
async fn next(
&mut self,
ctx: &mut ReconcileTableContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.persistent_ctx.table_id;
let table_name = &ctx.persistent_ctx.table_name;
let table_info_value = ctx
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await?
.with_context(|| error::TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
ctx.persistent_ctx.table_info_value = Some(table_info_value);
if let Some(column_metadatas) = check_column_metadatas_consistent(&self.region_metadata) {
// Safety: fetched in the above.
let table_info_value = ctx.persistent_ctx.table_info_value.clone().unwrap();
info!(
"Column metadatas are consistent for table: {}, table_id: {}.",
table_name, table_id
);
return Ok((
Box::new(UpdateTableInfo::new(table_info_value, column_metadatas)),
Status::executing(false),
));
};
match self.strategy {
ResolveStrategy::UseMetasrv => {
let table_info_value = ctx.persistent_ctx.table_info_value.as_ref().unwrap();
let name_to_ids = table_info_value
.table_info
.name_to_ids()
.context(MissingColumnIdsSnafu)?;
let column_metadata = build_column_metadata_from_table_info(
&table_info_value.table_info.meta.schema.column_schemas,
&table_info_value.table_info.meta.primary_key_indices,
&name_to_ids,
)?;
let region_ids =
resolve_column_metadatas_with_metasrv(&column_metadata, &self.region_metadata)?;
Ok((
Box::new(ReconcileRegions::new(column_metadata, region_ids)),
Status::executing(true),
))
}
ResolveStrategy::UseLatest => {
let (column_metadatas, region_ids) =
resolve_column_metadatas_with_latest(&self.region_metadata)?;
Ok((
Box::new(ReconcileRegions::new(column_metadatas, region_ids)),
Status::executing(true),
))
}
ResolveStrategy::AbortOnConflict => error::ColumnMetadataConflictsSnafu {
table_name: table_name.to_string(),
table_id,
}
.fail(),
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,126 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use store_api::metadata::ColumnMetadata;
use tonic::async_trait;
use crate::cache_invalidator::Context as CacheContext;
use crate::error::Result;
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::DeserializedValueWithBytes;
use crate::reconciliation::reconcile_table::reconciliation_end::ReconciliationEnd;
use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
use crate::rpc::router::region_distribution;
/// Updates the table info with the new column metadatas.
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateTableInfo {
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
column_metadatas: Vec<ColumnMetadata>,
}
impl UpdateTableInfo {
pub fn new(
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
column_metadatas: Vec<ColumnMetadata>,
) -> Self {
Self {
table_info_value,
column_metadatas,
}
}
}
#[async_trait]
#[typetag::serde]
impl State for UpdateTableInfo {
async fn next(
&mut self,
ctx: &mut ReconcileTableContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let new_table_meta = match &ctx.volatile_ctx.table_meta {
Some(table_meta) => table_meta.clone(),
None => ctx.build_table_meta(&self.column_metadatas)?,
};
let region_routes = &ctx
.persistent_ctx
.physical_table_route
.as_ref()
.unwrap()
.region_routes;
let region_distribution = region_distribution(region_routes);
let current_table_info_value = ctx.persistent_ctx.table_info_value.as_ref().unwrap();
let new_table_info = {
let mut new_table_info = current_table_info_value.table_info.clone();
new_table_info.meta = new_table_meta;
new_table_info
};
if new_table_info.meta == current_table_info_value.table_info.meta {
info!(
"Table info is already up to date for table: {}, table_id: {}",
ctx.table_name(),
ctx.table_id()
);
return Ok((Box::new(ReconciliationEnd), Status::executing(true)));
}
info!(
"Updating table info for table: {}, table_id: {}. new table meta: {:?}, current table meta: {:?}",
ctx.table_name(),
ctx.table_id(),
new_table_info.meta,
current_table_info_value.table_info.meta,
);
ctx.table_metadata_manager
.update_table_info(
current_table_info_value,
Some(region_distribution),
new_table_info,
)
.await?;
let table_ref = ctx.table_name().table_ref();
let table_id = ctx.table_id();
let cache_ctx = CacheContext {
subject: Some(format!(
"Invalidate table cache by reconciling table {}, table_id: {}",
table_ref, table_id,
)),
};
ctx.cache_invalidator
.invalidate(
&cache_ctx,
&[
CacheIdent::TableName(table_ref.into()),
CacheIdent::TableId(table_id),
],
)
.await?;
Ok((Box::new(ReconciliationEnd), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -16,22 +16,28 @@ use std::collections::{HashMap, HashSet};
use std::fmt;
use api::v1::SemanticType;
use common_telemetry::warn;
use datatypes::schema::ColumnSchema;
use snafu::{ensure, OptionExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableMeta;
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::error::{
MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu,
self, MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu,
};
use crate::key::table_name::{TableNameKey, TableNameManager};
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
#[derive(Debug, PartialEq, Eq)]
struct PartialRegionMetadata<'a> {
column_metadatas: &'a [ColumnMetadata],
primary_key: &'a [u32],
table_id: TableId,
pub(crate) struct PartialRegionMetadata<'a> {
pub(crate) column_metadatas: &'a [ColumnMetadata],
pub(crate) primary_key: &'a [u32],
pub(crate) table_id: TableId,
}
impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> {
@@ -269,15 +275,17 @@ pub(crate) fn check_column_metadata_invariants(
/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
///
/// Returns an error if:
/// - Any column is missing in the `name_to_ids`.
/// - The column id in table metadata is not the same as the column id in the column metadata.
/// - Any column is missing in the `name_to_ids`(if `name_to_ids` is provided).
/// - The column id in table metadata is not the same as the column id in the column metadata.(if `name_to_ids` is provided)
/// - The table index is missing in the column metadata.
/// - The primary key or partition key columns are missing in the column metadata.
///
/// TODO(weny): add tests
pub(crate) fn build_table_meta_from_column_metadatas(
table_id: TableId,
table_ref: TableReference,
table_meta: &RawTableMeta,
name_to_ids: &HashMap<String, u32>,
name_to_ids: Option<HashMap<String, u32>>,
column_metadata: &[ColumnMetadata],
) -> Result<RawTableMeta> {
let column_in_column_metadata = column_metadata
@@ -306,10 +314,10 @@ pub(crate) fn build_table_meta_from_column_metadatas(
}
);
// Ensures all primary key and partition key exists in the column metadata.
for column_name in primary_key_names.iter().chain(partition_key_names.iter()) {
let column_in_column_metadata =
column_in_column_metadata
if let Some(name_to_ids) = &name_to_ids {
// Ensures all primary key and partition key exists in the column metadata.
for column_name in primary_key_names.iter().chain(partition_key_names.iter()) {
let column_in_column_metadata = column_in_column_metadata
.get(column_name)
.with_context(|| MissingColumnInColumnMetadataSnafu {
column_name: column_name.to_string(),
@@ -317,19 +325,25 @@ pub(crate) fn build_table_meta_from_column_metadatas(
table_id,
})?;
let column_id = *name_to_ids
.get(*column_name)
.with_context(|| UnexpectedSnafu {
err_msg: format!("column id not found in name_to_ids: {}", column_name),
})?;
ensure!(
column_id == column_in_column_metadata.column_id,
MismatchColumnIdSnafu {
column_name: column_name.to_string(),
column_id,
table_name: table_ref.to_string(),
table_id,
}
let column_id = *name_to_ids
.get(*column_name)
.with_context(|| UnexpectedSnafu {
err_msg: format!("column id not found in name_to_ids: {}", column_name),
})?;
ensure!(
column_id == column_in_column_metadata.column_id,
MismatchColumnIdSnafu {
column_name: column_name.to_string(),
column_id,
table_name: table_ref.to_string(),
table_id,
}
);
}
} else {
warn!(
"`name_to_ids` is not provided, table: {}, table_id: {}",
table_ref, table_id
);
}
@@ -340,6 +354,7 @@ pub(crate) fn build_table_meta_from_column_metadatas(
let time_index = &mut new_raw_table_meta.schema.timestamp_index;
let columns = &mut new_raw_table_meta.schema.column_schemas;
let column_ids = &mut new_raw_table_meta.column_ids;
let next_column_id = &mut new_raw_table_meta.next_column_id;
column_ids.clear();
value_indices.clear();
@@ -368,6 +383,13 @@ pub(crate) fn build_table_meta_from_column_metadatas(
column_ids.push(col.column_id);
}
*next_column_id = column_ids
.iter()
.max()
.map(|max| max + 1)
.unwrap_or(*next_column_id)
.max(*next_column_id);
if let Some(time_index) = *time_index {
new_raw_table_meta.schema.column_schemas[time_index].set_time_index();
}
@@ -375,6 +397,49 @@ pub(crate) fn build_table_meta_from_column_metadatas(
Ok(new_raw_table_meta)
}
/// Validates the table id and name consistency.
///
/// It will check the table id and table name consistency.
/// If the table id and table name are not consistent, it will return an error.
pub(crate) async fn validate_table_id_and_name(
table_name_manager: &TableNameManager,
table_id: TableId,
table_name: &TableName,
) -> Result<()> {
let table_name_key = TableNameKey::new(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
);
let table_name_value = table_name_manager
.get(table_name_key)
.await?
.with_context(|| error::TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
ensure!(
table_name_value.table_id() == table_id,
error::UnexpectedSnafu {
err_msg: format!(
"The table id mismatch for table: {}, expected {}, actual {}",
table_name,
table_id,
table_name_value.table_id()
),
}
);
Ok(())
}
#[derive(Clone)]
pub struct Context {
pub node_manager: NodeManagerRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
@@ -385,12 +450,14 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;
use table::metadata::{RawTableMeta, TableMetaBuilder};
use table::table_reference::TableReference;
use super::*;
use crate::ddl::test_util::region_metadata::build_region_metadata;
use crate::error::Error;
use crate::reconciliation::utils::check_column_metadatas_consistent;
fn new_test_schema() -> Schema {
let column_schemas = vec![
@@ -448,6 +515,30 @@ mod tests {
table_meta.into()
}
#[test]
fn test_build_table_info_from_column_metadatas_identical() {
let column_metadatas = new_test_column_metadatas();
let table_id = 1;
let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
let mut table_meta = new_test_raw_table_info();
table_meta.column_ids = vec![0, 1, 2];
let name_to_ids = HashMap::from([
("col1".to_string(), 0),
("ts".to_string(), 1),
("col2".to_string(), 2),
]);
let new_table_meta = build_table_meta_from_column_metadatas(
table_id,
table_ref,
&table_meta,
Some(name_to_ids),
&column_metadatas,
)
.unwrap();
assert_eq!(new_table_meta, table_meta);
}
#[test]
fn test_build_table_info_from_column_metadatas() {
let mut column_metadatas = new_test_column_metadatas();
@@ -470,7 +561,7 @@ mod tests {
table_id,
table_ref,
&table_meta,
&name_to_ids,
Some(name_to_ids),
&column_metadatas,
)
.unwrap();
@@ -480,6 +571,7 @@ mod tests {
assert_eq!(new_table_meta.value_indices, vec![1, 2]);
assert_eq!(new_table_meta.schema.timestamp_index, Some(1));
assert_eq!(new_table_meta.column_ids, vec![0, 1, 2, 3]);
assert_eq!(new_table_meta.next_column_id, 4);
}
#[test]
@@ -499,7 +591,7 @@ mod tests {
table_id,
table_ref,
&table_meta,
&name_to_ids,
Some(name_to_ids),
&column_metadatas,
)
.unwrap_err();
@@ -524,7 +616,7 @@ mod tests {
table_id,
table_ref,
&table_meta,
&name_to_ids,
Some(name_to_ids),
&column_metadatas,
)
.unwrap_err();
@@ -555,7 +647,7 @@ mod tests {
table_id,
table_ref,
&table_meta,
&name_to_ids,
Some(name_to_ids.clone()),
&column_metadatas,
)
.unwrap_err();
@@ -569,7 +661,7 @@ mod tests {
table_id,
table_ref,
&table_meta,
&name_to_ids,
Some(name_to_ids),
&column_metadatas,
)
.unwrap_err();

View File

@@ -6,7 +6,7 @@ license.workspace = true
[features]
testing = []
enterprise = []
enterprise = ["mito2/enterprise"]
[lints]
workspace = true

View File

@@ -215,7 +215,10 @@ impl DataRegion {
AlterKind::SetRegionOptions { options: _ }
| AlterKind::UnsetRegionOptions { keys: _ }
| AlterKind::SetIndexes { options: _ }
| AlterKind::UnsetIndexes { options: _ } => {
| AlterKind::UnsetIndexes { options: _ }
| AlterKind::SyncColumns {
column_metadatas: _,
} => {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Alter(request))

View File

@@ -441,6 +441,7 @@ fn columns_to_column_schemas(
.collect::<Result<Vec<ColumnSchema>>>()
}
// TODO(weny): refactor this function to use `try_as_column_def`
pub fn column_schemas_to_defs(
column_schemas: Vec<ColumnSchema>,
primary_keys: &[String],

View File

@@ -593,6 +593,19 @@ impl RegionMetadataBuilder {
self.drop_defaults(names)?;
}
AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
AlterKind::SyncColumns { column_metadatas } => {
self.primary_key = column_metadatas
.iter()
.filter_map(|column_metadata| {
if column_metadata.semantic_type == SemanticType::Tag {
Some(column_metadata.column_id)
} else {
None
}
})
.collect::<Vec<_>>();
self.column_metadatas = column_metadatas;
}
}
Ok(self)
}

View File

@@ -567,6 +567,10 @@ pub enum AlterKind {
/// Columns to change.
columns: Vec<SetDefault>,
},
/// Sync column metadatas.
SyncColumns {
column_metadatas: Vec<ColumnMetadata>,
},
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct SetDefault {
@@ -755,6 +759,68 @@ impl AlterKind {
.iter()
.try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
}
AlterKind::SyncColumns { column_metadatas } => {
let new_primary_keys = column_metadatas
.iter()
.filter(|c| c.semantic_type == SemanticType::Tag)
.map(|c| (c.column_schema.name.as_str(), c.column_id))
.collect::<HashMap<_, _>>();
let old_primary_keys = metadata
.column_metadatas
.iter()
.filter(|c| c.semantic_type == SemanticType::Tag)
.map(|c| (c.column_schema.name.as_str(), c.column_id));
for (name, id) in old_primary_keys {
let primary_key =
new_primary_keys
.get(name)
.with_context(|| InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!("column {} is not a primary key", name),
})?;
ensure!(
*primary_key == id,
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"column with same name {} has different id, existing: {}, got: {}",
name, id, primary_key
),
}
);
}
let new_ts_column = column_metadatas
.iter()
.find(|c| c.semantic_type == SemanticType::Timestamp)
.map(|c| (c.column_schema.name.as_str(), c.column_id))
.context(InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: "timestamp column not found",
})?;
// Safety: timestamp column must exist.
let old_ts_column = metadata
.column_metadatas
.iter()
.find(|c| c.semantic_type == SemanticType::Timestamp)
.map(|c| (c.column_schema.name.as_str(), c.column_id))
.unwrap();
ensure!(
new_ts_column == old_ts_column,
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"timestamp column {} has different id, existing: {}, got: {}",
old_ts_column.0, old_ts_column.1, new_ts_column.1
),
}
);
}
}
Ok(())
}
@@ -787,9 +853,13 @@ impl AlterKind {
AlterKind::DropDefaults { names } => names
.iter()
.any(|name| metadata.column_by_name(name).is_some()),
AlterKind::SetDefaults { columns } => columns
.iter()
.any(|x| metadata.column_by_name(&x.name).is_some()),
AlterKind::SyncColumns { column_metadatas } => {
metadata.column_metadatas != *column_metadatas
}
}
}
@@ -924,6 +994,13 @@ impl TryFrom<alter_request::Kind> for AlterKind {
})
.collect::<Result<Vec<_>>>()?,
},
alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
column_metadatas: x
.column_defs
.into_iter()
.map(ColumnMetadata::try_from_column_def)
.collect::<Result<Vec<_>>>()?,
},
};
Ok(alter_kind)
@@ -1296,6 +1373,7 @@ impl fmt::Display for RegionRequest {
#[cfg(test)]
mod tests {
use api::v1::region::RegionColumnDef;
use api::v1::{ColumnDataType, ColumnDef};
use datatypes::prelude::ConcreteDataType;
@@ -1784,4 +1862,76 @@ mod tests {
metadata.schema_version = 1;
request.validate(&metadata).unwrap();
}
#[test]
fn test_validate_sync_columns() {
let metadata = new_metadata();
let kind = AlterKind::SyncColumns {
column_metadatas: vec![
ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 5,
},
ColumnMetadata {
column_schema: ColumnSchema::new(
"field_2",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 6,
},
],
};
let err = kind.validate(&metadata).unwrap_err();
assert!(err.to_string().contains("not a primary key"));
// Change the timestamp column name.
let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
let ts_column = column_metadatas_with_different_ts_column
.iter_mut()
.find(|c| c.semantic_type == SemanticType::Timestamp)
.unwrap();
ts_column.column_schema.name = "ts1".to_string();
let kind = AlterKind::SyncColumns {
column_metadatas: column_metadatas_with_different_ts_column,
};
let err = kind.validate(&metadata).unwrap_err();
assert!(err
.to_string()
.contains("timestamp column ts has different id"));
// Change the primary key column name.
let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
let pk_column = column_metadatas_with_different_pk_column
.iter_mut()
.find(|c| c.column_schema.name == "tag_0")
.unwrap();
pk_column.column_id = 100;
let kind = AlterKind::SyncColumns {
column_metadatas: column_metadatas_with_different_pk_column,
};
let err = kind.validate(&metadata).unwrap_err();
assert!(err
.to_string()
.contains("column with same name tag_0 has different id"));
// Add a new field column.
let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
column_metadatas_with_new_field_column.push(ColumnMetadata {
column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 4,
});
let kind = AlterKind::SyncColumns {
column_metadatas: column_metadatas_with_new_field_column,
};
kind.validate(&metadata).unwrap();
}
}