diff --git a/Cargo.lock b/Cargo.lock index 2f20de1284..932356dd72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5228,7 +5228,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3bb33593a781504e025e6315572bc5dfdc1dc497#3bb33593a781504e025e6315572bc5dfdc1dc497" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fe8c13f5f3c1fbef63f57fbdd29f0490dfeb987b#fe8c13f5f3c1fbef63f57fbdd29f0490dfeb987b" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index 46e92835f9..c9acdd9fea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,7 +140,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3bb33593a781504e025e6315572bc5dfdc1dc497" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fe8c13f5f3c1fbef63f57fbdd29f0490dfeb987b" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/api/src/v1/column_def.rs b/src/api/src/v1/column_def.rs index 316d5342db..3c3d37aa3a 100644 --- a/src/api/src/v1/column_def.rs +++ b/src/api/src/v1/column_def.rs @@ -24,7 +24,7 @@ use greptime_proto::v1::{ }; use snafu::ResultExt; -use crate::error::{self, Result}; +use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result}; use crate::helper::ColumnDataTypeWrapper; use crate::v1::{ColumnDef, ColumnOptions, SemanticType}; @@ -77,6 +77,48 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result { }) } +/// Tries to construct a `ColumnDef` from the given `ColumnSchema`. +/// +/// TODO(weny): Add tests for this function. +pub fn try_as_column_def(column_schema: &ColumnSchema, is_primary_key: bool) -> Result { + let column_datatype = + ColumnDataTypeWrapper::try_from(column_schema.data_type.clone()).map(|w| w.to_parts())?; + + let semantic_type = if column_schema.is_time_index() { + SemanticType::Timestamp + } else if is_primary_key { + SemanticType::Tag + } else { + SemanticType::Field + } as i32; + let comment = column_schema + .metadata() + .get(COMMENT_KEY) + .cloned() + .unwrap_or_default(); + + let default_constraint = match column_schema.default_constraint() { + None => vec![], + Some(v) => v + .clone() + .try_into() + .context(ConvertColumnDefaultConstraintSnafu { + column: &column_schema.name, + })?, + }; + let options = options_from_column_schema(column_schema); + Ok(ColumnDef { + name: column_schema.name.clone(), + data_type: column_datatype.0 as i32, + is_nullable: column_schema.is_nullable(), + default_constraint, + semantic_type, + comment, + datatype_extension: column_datatype.1, + options, + }) +} + /// Constructs a `ColumnOptions` from the given `ColumnSchema`. pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option { let mut options = ColumnOptions::default(); diff --git a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs index 50e0f9c35f..6b8ee4e7fd 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs @@ -13,12 +13,12 @@ // limitations under the License. use common_grpc_expr::alter_expr_to_request; -use itertools::Itertools; use snafu::ResultExt; use table::metadata::{RawTableInfo, TableInfo}; use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; +use crate::ddl::utils::table_info::batch_update_table_info_values; use crate::error; use crate::error::{ConvertAlterTableRequestSnafu, Result}; use crate::key::table_info::TableInfoValue; @@ -48,25 +48,8 @@ impl AlterLogicalTablesProcedure { pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> { let table_info_values = self.build_update_metadata()?; - let manager = &self.context.table_metadata_manager; - let chunk_size = manager.batch_update_table_info_value_chunk_size(); - if table_info_values.len() > chunk_size { - let chunks = table_info_values - .into_iter() - .chunks(chunk_size) - .into_iter() - .map(|check| check.collect::>()) - .collect::>(); - for chunk in chunks { - manager.batch_update_table_info_values(chunk).await?; - } - } else { - manager - .batch_update_table_info_values(table_info_values) - .await?; - } - - Ok(()) + batch_update_table_info_values(&self.context.table_metadata_manager, table_info_values) + .await } pub(crate) fn build_update_metadata( diff --git a/src/common/meta/src/ddl/alter_logical_tables/validator.rs b/src/common/meta/src/ddl/alter_logical_tables/validator.rs index e7295e2cd2..a6407e8403 100644 --- a/src/common/meta/src/ddl/alter_logical_tables/validator.rs +++ b/src/common/meta/src/ddl/alter_logical_tables/validator.rs @@ -21,7 +21,9 @@ use store_api::storage::TableId; use table::table_reference::TableReference; use crate::ddl::utils::table_id::get_all_table_ids_by_names; -use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids; +use crate::ddl::utils::table_info::{ + all_logical_table_routes_have_same_physical_id, get_all_table_info_values_by_table_ids, +}; use crate::error::{ AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu, TableRouteNotFoundSnafu, @@ -146,23 +148,16 @@ impl<'a> AlterLogicalTableValidator<'a> { table_route_manager: &TableRouteManager, table_ids: &[TableId], ) -> Result<()> { - let table_routes = table_route_manager - .table_route_storage() - .batch_get(table_ids) + let all_logical_table_routes_have_same_physical_id = + all_logical_table_routes_have_same_physical_id( + table_route_manager, + table_ids, + self.physical_table_id, + ) .await?; - let physical_table_id = self.physical_table_id; - - let is_same_physical_table = table_routes.iter().all(|r| { - if let Some(TableRouteValue::Logical(r)) = r { - r.physical_table_id() == physical_table_id - } else { - false - } - }); - ensure!( - is_same_physical_table, + all_logical_table_routes_have_same_physical_id, AlterLogicalTablesInvalidArgumentsSnafu { err_msg: "All the tasks should have the same physical table id" } diff --git a/src/common/meta/src/ddl/alter_table/executor.rs b/src/common/meta/src/ddl/alter_table/executor.rs index fda5839965..31bf1da125 100644 --- a/src/common/meta/src/ddl/alter_table/executor.rs +++ b/src/common/meta/src/ddl/alter_table/executor.rs @@ -309,6 +309,5 @@ fn build_new_table_info( "Built new table info: {:?} for table {}, table_id: {}", new_info.meta, table_name, table_id ); - Ok(new_info) } diff --git a/src/common/meta/src/ddl/create_table_template.rs b/src/common/meta/src/ddl/create_table_template.rs index 290fc33308..f091bf8ba2 100644 --- a/src/common/meta/src/ddl/create_table_template.rs +++ b/src/common/meta/src/ddl/create_table_template.rs @@ -21,8 +21,7 @@ use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; -use crate::error; -use crate::error::Result; +use crate::error::{self, Result}; use crate::wal_options_allocator::prepare_wal_options; pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result { diff --git a/src/common/meta/src/ddl/utils/table_info.rs b/src/common/meta/src/ddl/utils/table_info.rs index 155e466797..7f42a383fe 100644 --- a/src/common/meta/src/ddl/utils/table_info.rs +++ b/src/common/meta/src/ddl/utils/table_info.rs @@ -12,13 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use itertools::Itertools; use snafu::OptionExt; use store_api::storage::TableId; +use table::metadata::RawTableInfo; use table::table_reference::TableReference; use crate::error::{Result, TableInfoNotFoundSnafu}; use crate::key::table_info::{TableInfoManager, TableInfoValue}; -use crate::key::DeserializedValueWithBytes; +use crate::key::table_route::{TableRouteManager, TableRouteValue}; +use crate::key::{DeserializedValueWithBytes, TableMetadataManager}; /// Get all table info values by table ids. /// @@ -42,3 +45,56 @@ pub(crate) async fn get_all_table_info_values_by_table_ids<'a>( Ok(table_info_values) } + +/// Checks if all the logical table routes have the same physical table id. +pub(crate) async fn all_logical_table_routes_have_same_physical_id( + table_route_manager: &TableRouteManager, + table_ids: &[TableId], + physical_table_id: TableId, +) -> Result { + let table_routes = table_route_manager + .table_route_storage() + .batch_get(table_ids) + .await?; + + let is_same_physical_table = table_routes.iter().all(|r| { + if let Some(TableRouteValue::Logical(r)) = r { + r.physical_table_id() == physical_table_id + } else { + false + } + }); + + Ok(is_same_physical_table) +} + +/// Batch updates the table info values. +/// +/// The table info values are grouped into chunks, and each chunk is updated in a single transaction. +/// +/// Returns an error if any table info value fails to update. +pub(crate) async fn batch_update_table_info_values( + table_metadata_manager: &TableMetadataManager, + table_info_values: Vec<(DeserializedValueWithBytes, RawTableInfo)>, +) -> Result<()> { + let chunk_size = table_metadata_manager.batch_update_table_info_value_chunk_size(); + if table_info_values.len() > chunk_size { + let chunks = table_info_values + .into_iter() + .chunks(chunk_size) + .into_iter() + .map(|check| check.collect::>()) + .collect::>(); + for chunk in chunks { + table_metadata_manager + .batch_update_table_info_values(chunk) + .await?; + } + } else { + table_metadata_manager + .batch_update_table_info_values(table_info_values) + .await?; + } + + Ok(()) +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 14c52453bd..ab763dadcb 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -878,6 +878,12 @@ pub enum Error { error: object_store::Error, }, + #[snafu(display("Missing column ids"))] + MissingColumnIds { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Missing column in column metadata: {}, table: {}, table_id: {}", column_name, @@ -907,6 +913,24 @@ pub enum Error { table_name: String, table_id: TableId, }, + + #[snafu(display("Failed to convert column def, column: {}", column))] + ConvertColumnDef { + column: String, + #[snafu(implicit)] + location: Location, + source: api::error::Error, + }, + + #[snafu(display( + "Column metadata inconsistencies found in table: {}, table_id: {}", + table_name, + table_id + ))] + ColumnMetadataConflicts { + table_name: String, + table_id: TableId, + }, } pub type Result = std::result::Result; @@ -928,8 +952,10 @@ impl ErrorExt for Error { NoLeader { .. } => StatusCode::TableUnavailable, ValueNotExist { .. } | ProcedurePoisonConflict { .. } + | MissingColumnIds { .. } | MissingColumnInColumnMetadata { .. } - | MismatchColumnId { .. } => StatusCode::Unexpected, + | MismatchColumnId { .. } + | ColumnMetadataConflicts { .. } => StatusCode::Unexpected, Unsupported { .. } => StatusCode::Unsupported, WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable, @@ -1013,6 +1039,7 @@ impl ErrorExt for Error { AbortProcedure { source, .. } => source.status_code(), ConvertAlterTableRequest { source, .. } => source.status_code(), PutPoison { source, .. } => source.status_code(), + ConvertColumnDef { source, .. } => source.status_code(), ParseProcedureId { .. } | InvalidNumTopics { .. } diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index c13cac7e54..ac02544f68 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -32,13 +32,13 @@ pub mod key; pub mod kv_backend; pub mod leadership_notifier; pub mod lock_key; -pub mod maintenance; pub mod metrics; pub mod node_expiry_listener; pub mod node_manager; pub mod peer; pub mod poison_key; pub mod range_stream; +pub mod reconciliation; pub mod region_keeper; pub mod region_registry; pub mod rpc; diff --git a/src/common/meta/src/maintenance.rs b/src/common/meta/src/maintenance.rs deleted file mode 100644 index 40d8258012..0000000000 --- a/src/common/meta/src/maintenance.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub(crate) mod reconcile_table; diff --git a/src/common/meta/src/maintenance/reconcile_table.rs b/src/common/meta/src/reconciliation.rs similarity index 89% rename from src/common/meta/src/maintenance/reconcile_table.rs rename to src/common/meta/src/reconciliation.rs index e3ab70b964..8332a3a90f 100644 --- a/src/common/meta/src/maintenance/reconcile_table.rs +++ b/src/common/meta/src/reconciliation.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// TODO(weny): Remove it +#[allow(dead_code)] +pub(crate) mod reconcile_table; // TODO(weny): Remove it #[allow(dead_code)] pub(crate) mod utils; diff --git a/src/common/meta/src/reconciliation/reconcile_table.rs b/src/common/meta/src/reconciliation/reconcile_table.rs new file mode 100644 index 0000000000..2816355c00 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_table.rs @@ -0,0 +1,238 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod reconcile_regions; +pub(crate) mod reconciliation_end; +pub(crate) mod reconciliation_start; +pub(crate) mod resolve_column_metadata; +pub(crate) mod update_table_info; + +use std::any::Any; +use std::fmt::Debug; + +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, + Result as ProcedureResult, Status, +}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::metadata::ColumnMetadata; +use store_api::storage::TableId; +use table::metadata::RawTableMeta; +use table::table_name::TableName; +use tonic::async_trait; + +use crate::cache_invalidator::CacheInvalidatorRef; +use crate::error::Result; +use crate::key::table_info::TableInfoValue; +use crate::key::table_route::PhysicalTableRouteValue; +use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock}; +use crate::node_manager::NodeManagerRef; +use crate::reconciliation::reconcile_table::reconciliation_start::ReconciliationStart; +use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy; +use crate::reconciliation::utils::{build_table_meta_from_column_metadatas, Context}; + +pub struct ReconcileTableContext { + pub node_manager: NodeManagerRef, + pub table_metadata_manager: TableMetadataManagerRef, + pub cache_invalidator: CacheInvalidatorRef, + pub persistent_ctx: PersistentContext, + pub volatile_ctx: VolatileContext, +} + +impl ReconcileTableContext { + /// Creates a new [`ReconcileTableContext`] with the given [`Context`] and [`PersistentContext`]. + pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self { + Self { + node_manager: ctx.node_manager, + table_metadata_manager: ctx.table_metadata_manager, + cache_invalidator: ctx.cache_invalidator, + persistent_ctx, + volatile_ctx: VolatileContext::default(), + } + } + + pub(crate) fn table_name(&self) -> &TableName { + &self.persistent_ctx.table_name + } + + pub(crate) fn table_id(&self) -> TableId { + self.persistent_ctx.table_id + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct PersistentContext { + pub(crate) table_id: TableId, + pub(crate) table_name: TableName, + pub(crate) resolve_strategy: ResolveStrategy, + /// The table info value. + /// The value will be set in `ReconciliationStart` state. + pub(crate) table_info_value: Option>, + // The physical table route. + // The value will be set in `ReconciliationStart` state. + pub(crate) physical_table_route: Option, +} + +impl PersistentContext { + pub(crate) fn new( + table_id: TableId, + table_name: TableName, + resolve_strategy: ResolveStrategy, + ) -> Self { + Self { + table_id, + table_name, + resolve_strategy, + table_info_value: None, + physical_table_route: None, + } + } +} + +#[derive(Default)] +pub(crate) struct VolatileContext { + pub(crate) table_meta: Option, +} + +impl ReconcileTableContext { + /// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s. + pub(crate) fn build_table_meta( + &self, + column_metadatas: &[ColumnMetadata], + ) -> Result { + // Safety: The table info value is set in `ReconciliationStart` state. + let table_info_value = self.persistent_ctx.table_info_value.as_ref().unwrap(); + let table_id = self.table_id(); + let table_ref = self.table_name().table_ref(); + let name_to_ids = table_info_value.table_info.name_to_ids(); + let table_meta = build_table_meta_from_column_metadatas( + table_id, + table_ref, + &table_info_value.table_info.meta, + name_to_ids, + column_metadatas, + )?; + + Ok(table_meta) + } +} + +pub struct ReconcileTableProcedure { + pub context: ReconcileTableContext, + state: Box, +} + +impl ReconcileTableProcedure { + /// Creates a new [`ReconcileTableProcedure`] with the given [`Context`] and [`PersistentContext`]. + pub fn new( + ctx: Context, + table_id: TableId, + table_name: TableName, + resolve_strategy: ResolveStrategy, + ) -> Self { + let persistent_ctx = PersistentContext::new(table_id, table_name, resolve_strategy); + let context = ReconcileTableContext::new(ctx, persistent_ctx); + let state = Box::new(ReconciliationStart); + Self { context, state } + } +} + +impl ReconcileTableProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileTable"; + + pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult { + let ProcedureDataOwned { + state, + persistent_ctx, + } = serde_json::from_str(json).context(FromJsonSnafu)?; + let context = ReconcileTableContext::new(ctx, persistent_ctx); + Ok(Self { context, state }) + } +} + +#[derive(Debug, Serialize)] +struct ProcedureData<'a> { + state: &'a dyn State, + persistent_ctx: &'a PersistentContext, +} + +#[derive(Debug, Deserialize)] +struct ProcedureDataOwned { + state: Box, + persistent_ctx: PersistentContext, +} + +#[async_trait] +impl Procedure for ReconcileTableProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &mut self.state; + + match state.next(&mut self.context, _ctx).await { + Ok((next, status)) => { + *state = next; + Ok(status) + } + Err(e) => { + if e.is_retry_later() { + Err(ProcedureError::retry_later(e)) + } else { + Err(ProcedureError::external(e)) + } + } + } + } + + fn dump(&self) -> ProcedureResult { + let data = ProcedureData { + state: self.state.as_ref(), + persistent_ctx: &self.context.persistent_ctx, + }; + serde_json::to_string(&data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let table_ref = &self.context.table_name().table_ref(); + + LockKey::new(vec![ + CatalogLock::Read(table_ref.catalog).into(), + SchemaLock::read(table_ref.catalog, table_ref.schema).into(), + TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(), + ]) + } +} + +#[async_trait::async_trait] +#[typetag::serde(tag = "reconcile_table_state")] +pub(crate) trait State: Sync + Send + Debug { + fn name(&self) -> &'static str { + let type_name = std::any::type_name::(); + // short name + type_name.split("::").last().unwrap_or(type_name) + } + + async fn next( + &mut self, + ctx: &mut ReconcileTableContext, + procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)>; + + fn as_any(&self) -> &dyn Any; +} diff --git a/src/common/meta/src/reconciliation/reconcile_table/reconcile_regions.rs b/src/common/meta/src/reconciliation/reconcile_table/reconcile_regions.rs new file mode 100644 index 0000000000..52c90d8a02 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_table/reconcile_regions.rs @@ -0,0 +1,199 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::{HashMap, HashSet}; + +use api::v1::column_def::try_as_column_def; +use api::v1::region::region_request::Body; +use api::v1::region::{ + alter_request, AlterRequest, RegionColumnDef, RegionRequest, RegionRequestHeader, SyncColumns, +}; +use api::v1::{ColumnDef, SemanticType}; +use async_trait::async_trait; +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; +use common_telemetry::tracing_context::TracingContext; +use futures::future; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY; +use store_api::storage::{ColumnId, RegionId}; + +use crate::ddl::utils::{add_peer_context_if_needed, extract_column_metadatas}; +use crate::error::{ConvertColumnDefSnafu, Result, UnexpectedSnafu}; +use crate::reconciliation::reconcile_table::reconciliation_end::ReconciliationEnd; +use crate::reconciliation::reconcile_table::update_table_info::UpdateTableInfo; +use crate::reconciliation::reconcile_table::{ReconcileTableContext, State}; +use crate::rpc::router::{find_leaders, region_distribution}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ReconcileRegions { + column_metadatas: Vec, + region_ids: HashSet, +} + +impl ReconcileRegions { + pub fn new(column_metadatas: Vec, region_ids: Vec) -> Self { + Self { + column_metadatas, + region_ids: region_ids.into_iter().collect(), + } + } +} + +#[async_trait] +#[typetag::serde] +impl State for ReconcileRegions { + async fn next( + &mut self, + ctx: &mut ReconcileTableContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + let table_meta = ctx.build_table_meta(&self.column_metadatas)?; + ctx.volatile_ctx.table_meta = Some(table_meta); + let table_id = ctx.table_id(); + let table_name = ctx.table_name(); + + let primary_keys = self + .column_metadatas + .iter() + .filter(|c| c.semantic_type == SemanticType::Tag) + .map(|c| c.column_schema.name.to_string()) + .collect::>(); + let column_defs = self + .column_metadatas + .iter() + .map(|c| { + let column_def = try_as_column_def( + &c.column_schema, + primary_keys.contains(&c.column_schema.name), + ) + .context(ConvertColumnDefSnafu { + column: &c.column_schema.name, + })?; + + Ok((c.column_id, column_def)) + }) + .collect::>>()?; + + // Sends sync column metadatas to datanode. + // Safety: The physical table route is set in `ReconciliationStart` state. + let region_routes = &ctx + .persistent_ctx + .physical_table_route + .as_ref() + .unwrap() + .region_routes; + let region_distribution = region_distribution(region_routes); + let leaders = find_leaders(region_routes) + .into_iter() + .map(|p| (p.id, p)) + .collect::>(); + let mut sync_column_tsks = Vec::with_capacity(self.region_ids.len()); + for (datanode_id, region_role_set) in region_distribution { + if region_role_set.leader_regions.is_empty() { + continue; + } + // Safety: It contains all leaders in the region routes. + let peer = leaders.get(&datanode_id).unwrap(); + for region_id in region_role_set.leader_regions { + let region_id = RegionId::new(ctx.persistent_ctx.table_id, region_id); + if self.region_ids.contains(®ion_id) { + let requester = ctx.node_manager.datanode(peer).await; + let request = make_alter_region_request(region_id, &column_defs); + let peer = peer.clone(); + + sync_column_tsks.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(peer)) + }); + } + } + } + + let mut results = future::join_all(sync_column_tsks) + .await + .into_iter() + .collect::>>()?; + + // Ensures all the column metadatas are the same. + let column_metadatas = + extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?.context( + UnexpectedSnafu { + err_msg: format!( + "The table column metadata schemas from datanodes are not the same, table: {}, table_id: {}", + table_name, + table_id + ), + }, + )?; + + // Checks all column metadatas are consistent, and updates the table info if needed. + if column_metadatas != self.column_metadatas { + info!("Datanode column metadatas are not consistent with metasrv, updating metasrv's column metadatas, table: {}, table_id: {}", table_name, table_id); + // Safety: fetched in the above. + let table_info_value = ctx.persistent_ctx.table_info_value.clone().unwrap(); + return Ok(( + Box::new(UpdateTableInfo::new(table_info_value, column_metadatas)), + Status::executing(true), + )); + } + + Ok((Box::new(ReconciliationEnd), Status::executing(false))) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +/// Makes an alter region request to sync columns. +fn make_alter_region_request( + region_id: RegionId, + column_defs: &[(ColumnId, ColumnDef)], +) -> RegionRequest { + let kind = alter_request::Kind::SyncColumns(to_region_sync_columns(column_defs)); + + let alter_request = AlterRequest { + region_id: region_id.as_u64(), + schema_version: 0, + kind: Some(kind), + }; + + RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(Body::Alter(alter_request)), + } +} + +fn to_region_sync_columns(column_defs: &[(ColumnId, ColumnDef)]) -> SyncColumns { + let region_column_defs = column_defs + .iter() + .map(|(column_id, column_def)| RegionColumnDef { + column_id: *column_id, + column_def: Some(column_def.clone()), + }) + .collect::>(); + + SyncColumns { + column_defs: region_column_defs, + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_table/reconciliation_end.rs b/src/common/meta/src/reconciliation/reconcile_table/reconciliation_end.rs new file mode 100644 index 0000000000..1fc11bed95 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_table/reconciliation_end.rs @@ -0,0 +1,43 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, Status}; +use serde::{Deserialize, Serialize}; +use tonic::async_trait; + +use crate::error::Result; +use crate::reconciliation::reconcile_table::{ReconcileTableContext, State}; + +/// The state of the reconciliation end. +/// This state is used to indicate that the reconciliation is done. +#[derive(Debug, Serialize, Deserialize)] +pub struct ReconciliationEnd; + +#[async_trait] +#[typetag::serde] +impl State for ReconciliationEnd { + async fn next( + &mut self, + _ctx: &mut ReconcileTableContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + Ok((Box::new(ReconciliationEnd), Status::done())) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_table/reconciliation_start.rs b/src/common/meta/src/reconciliation/reconcile_table/reconciliation_start.rs new file mode 100644 index 0000000000..4ae4d1cbb4 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_table/reconciliation_start.rs @@ -0,0 +1,108 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; +use serde::{Deserialize, Serialize}; +use snafu::ensure; + +use crate::ddl::utils::region_metadata_lister::RegionMetadataLister; +use crate::error::{self, Result, UnexpectedSnafu}; +use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveColumnMetadata; +use crate::reconciliation::reconcile_table::{ReconcileTableContext, State}; + +/// The start state of the reconciliation procedure. +/// +/// This state is used to prepare the table for reconciliation. +/// It will: +/// 1. Check the table id and table name consistency. +/// 2. Ensures the table is a physical table. +/// 3. List the region metadatas for the physical table. +#[derive(Debug, Serialize, Deserialize)] +pub struct ReconciliationStart; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ReconciliationStart { + async fn next( + &mut self, + ctx: &mut ReconcileTableContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + let table_id = ctx.table_id(); + let table_name = ctx.table_name(); + + let (physical_table_id, physical_table_route) = ctx + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) + .await?; + ensure!( + physical_table_id == table_id, + error::UnexpectedSnafu { + err_msg: format!( + "Reconcile table only works for physical table, but got logical table: {}, table_id: {}", + table_name, table_id + ), + } + ); + + info!("Reconciling table: {}, table_id: {}", table_name, table_id); + // TODO(weny): Repairs the table route if needed. + let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone()); + // Always list region metadatas for the physical table. + let region_metadatas = region_metadata_lister + .list(physical_table_id, &physical_table_route.region_routes) + .await?; + + ensure!( + !region_metadatas.is_empty(), + error::UnexpectedSnafu { + err_msg: format!( + "No region metadata found for table: {}, table_id: {}", + table_name, table_id + ), + } + ); + + if region_metadatas.iter().any(|r| r.is_none()) { + return UnexpectedSnafu { + err_msg: format!( + "Some regions are not opened, table: {}, table_id: {}", + table_name, table_id + ), + } + .fail(); + } + + // Persist the physical table route. + // TODO(weny): refetch the physical table route if repair is needed. + ctx.persistent_ctx.physical_table_route = Some(physical_table_route); + let region_metadatas = region_metadatas.into_iter().map(|r| r.unwrap()).collect(); + Ok(( + Box::new(ResolveColumnMetadata::new( + ctx.persistent_ctx.resolve_strategy, + region_metadatas, + )), + // We don't persist the state of this step. + Status::executing(false), + )) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs b/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs new file mode 100644 index 0000000000..ee563cbe8d --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs @@ -0,0 +1,135 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use async_trait::async_trait; +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; +use store_api::metadata::RegionMetadata; + +use crate::error::{self, MissingColumnIdsSnafu, Result}; +use crate::reconciliation::reconcile_table::reconcile_regions::ReconcileRegions; +use crate::reconciliation::reconcile_table::update_table_info::UpdateTableInfo; +use crate::reconciliation::reconcile_table::{ReconcileTableContext, State}; +use crate::reconciliation::utils::{ + build_column_metadata_from_table_info, check_column_metadatas_consistent, + resolve_column_metadatas_with_latest, resolve_column_metadatas_with_metasrv, +}; + +/// Strategy for resolving column metadata inconsistencies. +#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +pub(crate) enum ResolveStrategy { + /// Always uses the column metadata from metasrv. + UseMetasrv, + + /// Trusts the latest column metadata from datanode. + UseLatest, + + /// Aborts the resolution process if inconsistencies are detected. + AbortOnConflict, +} + +/// State responsible for resolving inconsistencies in column metadata across physical regions. +#[derive(Debug, Serialize, Deserialize)] +pub struct ResolveColumnMetadata { + strategy: ResolveStrategy, + region_metadata: Vec, +} + +impl ResolveColumnMetadata { + pub fn new(strategy: ResolveStrategy, region_metadata: Vec) -> Self { + Self { + strategy, + region_metadata, + } + } +} + +#[async_trait] +#[typetag::serde] +impl State for ResolveColumnMetadata { + async fn next( + &mut self, + ctx: &mut ReconcileTableContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + let table_id = ctx.persistent_ctx.table_id; + let table_name = &ctx.persistent_ctx.table_name; + + let table_info_value = ctx + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await? + .with_context(|| error::TableNotFoundSnafu { + table_name: table_name.to_string(), + })?; + ctx.persistent_ctx.table_info_value = Some(table_info_value); + + if let Some(column_metadatas) = check_column_metadatas_consistent(&self.region_metadata) { + // Safety: fetched in the above. + let table_info_value = ctx.persistent_ctx.table_info_value.clone().unwrap(); + info!( + "Column metadatas are consistent for table: {}, table_id: {}.", + table_name, table_id + ); + return Ok(( + Box::new(UpdateTableInfo::new(table_info_value, column_metadatas)), + Status::executing(false), + )); + }; + + match self.strategy { + ResolveStrategy::UseMetasrv => { + let table_info_value = ctx.persistent_ctx.table_info_value.as_ref().unwrap(); + let name_to_ids = table_info_value + .table_info + .name_to_ids() + .context(MissingColumnIdsSnafu)?; + let column_metadata = build_column_metadata_from_table_info( + &table_info_value.table_info.meta.schema.column_schemas, + &table_info_value.table_info.meta.primary_key_indices, + &name_to_ids, + )?; + + let region_ids = + resolve_column_metadatas_with_metasrv(&column_metadata, &self.region_metadata)?; + Ok(( + Box::new(ReconcileRegions::new(column_metadata, region_ids)), + Status::executing(true), + )) + } + ResolveStrategy::UseLatest => { + let (column_metadatas, region_ids) = + resolve_column_metadatas_with_latest(&self.region_metadata)?; + Ok(( + Box::new(ReconcileRegions::new(column_metadatas, region_ids)), + Status::executing(true), + )) + } + ResolveStrategy::AbortOnConflict => error::ColumnMetadataConflictsSnafu { + table_name: table_name.to_string(), + table_id, + } + .fail(), + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_table/update_table_info.rs b/src/common/meta/src/reconciliation/reconcile_table/update_table_info.rs new file mode 100644 index 0000000000..a6736daed0 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_table/update_table_info.rs @@ -0,0 +1,126 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; +use serde::{Deserialize, Serialize}; +use store_api::metadata::ColumnMetadata; +use tonic::async_trait; + +use crate::cache_invalidator::Context as CacheContext; +use crate::error::Result; +use crate::instruction::CacheIdent; +use crate::key::table_info::TableInfoValue; +use crate::key::DeserializedValueWithBytes; +use crate::reconciliation::reconcile_table::reconciliation_end::ReconciliationEnd; +use crate::reconciliation::reconcile_table::{ReconcileTableContext, State}; +use crate::rpc::router::region_distribution; + +/// Updates the table info with the new column metadatas. +#[derive(Debug, Serialize, Deserialize)] +pub struct UpdateTableInfo { + table_info_value: DeserializedValueWithBytes, + column_metadatas: Vec, +} + +impl UpdateTableInfo { + pub fn new( + table_info_value: DeserializedValueWithBytes, + column_metadatas: Vec, + ) -> Self { + Self { + table_info_value, + column_metadatas, + } + } +} + +#[async_trait] +#[typetag::serde] +impl State for UpdateTableInfo { + async fn next( + &mut self, + ctx: &mut ReconcileTableContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + let new_table_meta = match &ctx.volatile_ctx.table_meta { + Some(table_meta) => table_meta.clone(), + None => ctx.build_table_meta(&self.column_metadatas)?, + }; + + let region_routes = &ctx + .persistent_ctx + .physical_table_route + .as_ref() + .unwrap() + .region_routes; + let region_distribution = region_distribution(region_routes); + let current_table_info_value = ctx.persistent_ctx.table_info_value.as_ref().unwrap(); + let new_table_info = { + let mut new_table_info = current_table_info_value.table_info.clone(); + new_table_info.meta = new_table_meta; + new_table_info + }; + + if new_table_info.meta == current_table_info_value.table_info.meta { + info!( + "Table info is already up to date for table: {}, table_id: {}", + ctx.table_name(), + ctx.table_id() + ); + return Ok((Box::new(ReconciliationEnd), Status::executing(true))); + } + + info!( + "Updating table info for table: {}, table_id: {}. new table meta: {:?}, current table meta: {:?}", + ctx.table_name(), + ctx.table_id(), + new_table_info.meta, + current_table_info_value.table_info.meta, + ); + ctx.table_metadata_manager + .update_table_info( + current_table_info_value, + Some(region_distribution), + new_table_info, + ) + .await?; + + let table_ref = ctx.table_name().table_ref(); + let table_id = ctx.table_id(); + let cache_ctx = CacheContext { + subject: Some(format!( + "Invalidate table cache by reconciling table {}, table_id: {}", + table_ref, table_id, + )), + }; + ctx.cache_invalidator + .invalidate( + &cache_ctx, + &[ + CacheIdent::TableName(table_ref.into()), + CacheIdent::TableId(table_id), + ], + ) + .await?; + + Ok((Box::new(ReconciliationEnd), Status::executing(true))) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/meta/src/maintenance/reconcile_table/utils.rs b/src/common/meta/src/reconciliation/utils.rs similarity index 85% rename from src/common/meta/src/maintenance/reconcile_table/utils.rs rename to src/common/meta/src/reconciliation/utils.rs index ef4ac5edcb..1a28b61316 100644 --- a/src/common/meta/src/maintenance/reconcile_table/utils.rs +++ b/src/common/meta/src/reconciliation/utils.rs @@ -16,22 +16,28 @@ use std::collections::{HashMap, HashSet}; use std::fmt; use api::v1::SemanticType; +use common_telemetry::warn; use datatypes::schema::ColumnSchema; use snafu::{ensure, OptionExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::storage::{RegionId, TableId}; use table::metadata::RawTableMeta; +use table::table_name::TableName; use table::table_reference::TableReference; +use crate::cache_invalidator::CacheInvalidatorRef; use crate::error::{ - MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu, + self, MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu, }; +use crate::key::table_name::{TableNameKey, TableNameManager}; +use crate::key::TableMetadataManagerRef; +use crate::node_manager::NodeManagerRef; #[derive(Debug, PartialEq, Eq)] -struct PartialRegionMetadata<'a> { - column_metadatas: &'a [ColumnMetadata], - primary_key: &'a [u32], - table_id: TableId, +pub(crate) struct PartialRegionMetadata<'a> { + pub(crate) column_metadatas: &'a [ColumnMetadata], + pub(crate) primary_key: &'a [u32], + pub(crate) table_id: TableId, } impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> { @@ -269,15 +275,17 @@ pub(crate) fn check_column_metadata_invariants( /// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s. /// /// Returns an error if: -/// - Any column is missing in the `name_to_ids`. -/// - The column id in table metadata is not the same as the column id in the column metadata. +/// - Any column is missing in the `name_to_ids`(if `name_to_ids` is provided). +/// - The column id in table metadata is not the same as the column id in the column metadata.(if `name_to_ids` is provided) /// - The table index is missing in the column metadata. /// - The primary key or partition key columns are missing in the column metadata. +/// +/// TODO(weny): add tests pub(crate) fn build_table_meta_from_column_metadatas( table_id: TableId, table_ref: TableReference, table_meta: &RawTableMeta, - name_to_ids: &HashMap, + name_to_ids: Option>, column_metadata: &[ColumnMetadata], ) -> Result { let column_in_column_metadata = column_metadata @@ -306,10 +314,10 @@ pub(crate) fn build_table_meta_from_column_metadatas( } ); - // Ensures all primary key and partition key exists in the column metadata. - for column_name in primary_key_names.iter().chain(partition_key_names.iter()) { - let column_in_column_metadata = - column_in_column_metadata + if let Some(name_to_ids) = &name_to_ids { + // Ensures all primary key and partition key exists in the column metadata. + for column_name in primary_key_names.iter().chain(partition_key_names.iter()) { + let column_in_column_metadata = column_in_column_metadata .get(column_name) .with_context(|| MissingColumnInColumnMetadataSnafu { column_name: column_name.to_string(), @@ -317,19 +325,25 @@ pub(crate) fn build_table_meta_from_column_metadatas( table_id, })?; - let column_id = *name_to_ids - .get(*column_name) - .with_context(|| UnexpectedSnafu { - err_msg: format!("column id not found in name_to_ids: {}", column_name), - })?; - ensure!( - column_id == column_in_column_metadata.column_id, - MismatchColumnIdSnafu { - column_name: column_name.to_string(), - column_id, - table_name: table_ref.to_string(), - table_id, - } + let column_id = *name_to_ids + .get(*column_name) + .with_context(|| UnexpectedSnafu { + err_msg: format!("column id not found in name_to_ids: {}", column_name), + })?; + ensure!( + column_id == column_in_column_metadata.column_id, + MismatchColumnIdSnafu { + column_name: column_name.to_string(), + column_id, + table_name: table_ref.to_string(), + table_id, + } + ); + } + } else { + warn!( + "`name_to_ids` is not provided, table: {}, table_id: {}", + table_ref, table_id ); } @@ -340,6 +354,7 @@ pub(crate) fn build_table_meta_from_column_metadatas( let time_index = &mut new_raw_table_meta.schema.timestamp_index; let columns = &mut new_raw_table_meta.schema.column_schemas; let column_ids = &mut new_raw_table_meta.column_ids; + let next_column_id = &mut new_raw_table_meta.next_column_id; column_ids.clear(); value_indices.clear(); @@ -368,6 +383,13 @@ pub(crate) fn build_table_meta_from_column_metadatas( column_ids.push(col.column_id); } + *next_column_id = column_ids + .iter() + .max() + .map(|max| max + 1) + .unwrap_or(*next_column_id) + .max(*next_column_id); + if let Some(time_index) = *time_index { new_raw_table_meta.schema.column_schemas[time_index].set_time_index(); } @@ -375,6 +397,49 @@ pub(crate) fn build_table_meta_from_column_metadatas( Ok(new_raw_table_meta) } +/// Validates the table id and name consistency. +/// +/// It will check the table id and table name consistency. +/// If the table id and table name are not consistent, it will return an error. +pub(crate) async fn validate_table_id_and_name( + table_name_manager: &TableNameManager, + table_id: TableId, + table_name: &TableName, +) -> Result<()> { + let table_name_key = TableNameKey::new( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ); + let table_name_value = table_name_manager + .get(table_name_key) + .await? + .with_context(|| error::TableNotFoundSnafu { + table_name: table_name.to_string(), + })?; + + ensure!( + table_name_value.table_id() == table_id, + error::UnexpectedSnafu { + err_msg: format!( + "The table id mismatch for table: {}, expected {}, actual {}", + table_name, + table_id, + table_name_value.table_id() + ), + } + ); + + Ok(()) +} + +#[derive(Clone)] +pub struct Context { + pub node_manager: NodeManagerRef, + pub table_metadata_manager: TableMetadataManagerRef, + pub cache_invalidator: CacheInvalidatorRef, +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; @@ -385,12 +450,14 @@ mod tests { use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder}; use store_api::metadata::ColumnMetadata; + use store_api::storage::RegionId; use table::metadata::{RawTableMeta, TableMetaBuilder}; use table::table_reference::TableReference; use super::*; use crate::ddl::test_util::region_metadata::build_region_metadata; use crate::error::Error; + use crate::reconciliation::utils::check_column_metadatas_consistent; fn new_test_schema() -> Schema { let column_schemas = vec![ @@ -448,6 +515,30 @@ mod tests { table_meta.into() } + #[test] + fn test_build_table_info_from_column_metadatas_identical() { + let column_metadatas = new_test_column_metadatas(); + let table_id = 1; + let table_ref = TableReference::full("test_catalog", "test_schema", "test_table"); + let mut table_meta = new_test_raw_table_info(); + table_meta.column_ids = vec![0, 1, 2]; + let name_to_ids = HashMap::from([ + ("col1".to_string(), 0), + ("ts".to_string(), 1), + ("col2".to_string(), 2), + ]); + + let new_table_meta = build_table_meta_from_column_metadatas( + table_id, + table_ref, + &table_meta, + Some(name_to_ids), + &column_metadatas, + ) + .unwrap(); + assert_eq!(new_table_meta, table_meta); + } + #[test] fn test_build_table_info_from_column_metadatas() { let mut column_metadatas = new_test_column_metadatas(); @@ -470,7 +561,7 @@ mod tests { table_id, table_ref, &table_meta, - &name_to_ids, + Some(name_to_ids), &column_metadatas, ) .unwrap(); @@ -480,6 +571,7 @@ mod tests { assert_eq!(new_table_meta.value_indices, vec![1, 2]); assert_eq!(new_table_meta.schema.timestamp_index, Some(1)); assert_eq!(new_table_meta.column_ids, vec![0, 1, 2, 3]); + assert_eq!(new_table_meta.next_column_id, 4); } #[test] @@ -499,7 +591,7 @@ mod tests { table_id, table_ref, &table_meta, - &name_to_ids, + Some(name_to_ids), &column_metadatas, ) .unwrap_err(); @@ -524,7 +616,7 @@ mod tests { table_id, table_ref, &table_meta, - &name_to_ids, + Some(name_to_ids), &column_metadatas, ) .unwrap_err(); @@ -555,7 +647,7 @@ mod tests { table_id, table_ref, &table_meta, - &name_to_ids, + Some(name_to_ids.clone()), &column_metadatas, ) .unwrap_err(); @@ -569,7 +661,7 @@ mod tests { table_id, table_ref, &table_meta, - &name_to_ids, + Some(name_to_ids), &column_metadatas, ) .unwrap_err(); diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index dfb8a3da93..3b14f93b5f 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -6,7 +6,7 @@ license.workspace = true [features] testing = [] -enterprise = [] +enterprise = ["mito2/enterprise"] [lints] workspace = true diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index c727f6a5f9..a359e697d7 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -215,7 +215,10 @@ impl DataRegion { AlterKind::SetRegionOptions { options: _ } | AlterKind::UnsetRegionOptions { keys: _ } | AlterKind::SetIndexes { options: _ } - | AlterKind::UnsetIndexes { options: _ } => { + | AlterKind::UnsetIndexes { options: _ } + | AlterKind::SyncColumns { + column_metadatas: _, + } => { let region_id = utils::to_data_region_id(region_id); self.mito .handle_request(region_id, RegionRequest::Alter(request)) diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index c992b322f9..8a5d44c0a0 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -441,6 +441,7 @@ fn columns_to_column_schemas( .collect::>>() } +// TODO(weny): refactor this function to use `try_as_column_def` pub fn column_schemas_to_defs( column_schemas: Vec, primary_keys: &[String], diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 9c426169fa..229f50ed26 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -593,6 +593,19 @@ impl RegionMetadataBuilder { self.drop_defaults(names)?; } AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?, + AlterKind::SyncColumns { column_metadatas } => { + self.primary_key = column_metadatas + .iter() + .filter_map(|column_metadata| { + if column_metadata.semantic_type == SemanticType::Tag { + Some(column_metadata.column_id) + } else { + None + } + }) + .collect::>(); + self.column_metadatas = column_metadatas; + } } Ok(self) } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 74e9dac029..1e5e3c3d14 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -567,6 +567,10 @@ pub enum AlterKind { /// Columns to change. columns: Vec, }, + /// Sync column metadatas. + SyncColumns { + column_metadatas: Vec, + }, } #[derive(Debug, PartialEq, Eq, Clone)] pub struct SetDefault { @@ -755,6 +759,68 @@ impl AlterKind { .iter() .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?; } + AlterKind::SyncColumns { column_metadatas } => { + let new_primary_keys = column_metadatas + .iter() + .filter(|c| c.semantic_type == SemanticType::Tag) + .map(|c| (c.column_schema.name.as_str(), c.column_id)) + .collect::>(); + + let old_primary_keys = metadata + .column_metadatas + .iter() + .filter(|c| c.semantic_type == SemanticType::Tag) + .map(|c| (c.column_schema.name.as_str(), c.column_id)); + + for (name, id) in old_primary_keys { + let primary_key = + new_primary_keys + .get(name) + .with_context(|| InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!("column {} is not a primary key", name), + })?; + + ensure!( + *primary_key == id, + InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!( + "column with same name {} has different id, existing: {}, got: {}", + name, id, primary_key + ), + } + ); + } + + let new_ts_column = column_metadatas + .iter() + .find(|c| c.semantic_type == SemanticType::Timestamp) + .map(|c| (c.column_schema.name.as_str(), c.column_id)) + .context(InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: "timestamp column not found", + })?; + + // Safety: timestamp column must exist. + let old_ts_column = metadata + .column_metadatas + .iter() + .find(|c| c.semantic_type == SemanticType::Timestamp) + .map(|c| (c.column_schema.name.as_str(), c.column_id)) + .unwrap(); + + ensure!( + new_ts_column == old_ts_column, + InvalidRegionRequestSnafu { + region_id: metadata.region_id, + err: format!( + "timestamp column {} has different id, existing: {}, got: {}", + old_ts_column.0, old_ts_column.1, new_ts_column.1 + ), + } + ); + } } Ok(()) } @@ -787,9 +853,13 @@ impl AlterKind { AlterKind::DropDefaults { names } => names .iter() .any(|name| metadata.column_by_name(name).is_some()), + AlterKind::SetDefaults { columns } => columns .iter() .any(|x| metadata.column_by_name(&x.name).is_some()), + AlterKind::SyncColumns { column_metadatas } => { + metadata.column_metadatas != *column_metadatas + } } } @@ -924,6 +994,13 @@ impl TryFrom for AlterKind { }) .collect::>>()?, }, + alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns { + column_metadatas: x + .column_defs + .into_iter() + .map(ColumnMetadata::try_from_column_def) + .collect::>>()?, + }, }; Ok(alter_kind) @@ -1296,6 +1373,7 @@ impl fmt::Display for RegionRequest { #[cfg(test)] mod tests { + use api::v1::region::RegionColumnDef; use api::v1::{ColumnDataType, ColumnDef}; use datatypes::prelude::ConcreteDataType; @@ -1784,4 +1862,76 @@ mod tests { metadata.schema_version = 1; request.validate(&metadata).unwrap(); } + + #[test] + fn test_validate_sync_columns() { + let metadata = new_metadata(); + let kind = AlterKind::SyncColumns { + column_metadatas: vec![ + ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 5, + }, + ColumnMetadata { + column_schema: ColumnSchema::new( + "field_2", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 6, + }, + ], + }; + let err = kind.validate(&metadata).unwrap_err(); + assert!(err.to_string().contains("not a primary key")); + + // Change the timestamp column name. + let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone(); + let ts_column = column_metadatas_with_different_ts_column + .iter_mut() + .find(|c| c.semantic_type == SemanticType::Timestamp) + .unwrap(); + ts_column.column_schema.name = "ts1".to_string(); + + let kind = AlterKind::SyncColumns { + column_metadatas: column_metadatas_with_different_ts_column, + }; + let err = kind.validate(&metadata).unwrap_err(); + assert!(err + .to_string() + .contains("timestamp column ts has different id")); + + // Change the primary key column name. + let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone(); + let pk_column = column_metadatas_with_different_pk_column + .iter_mut() + .find(|c| c.column_schema.name == "tag_0") + .unwrap(); + pk_column.column_id = 100; + let kind = AlterKind::SyncColumns { + column_metadatas: column_metadatas_with_different_pk_column, + }; + let err = kind.validate(&metadata).unwrap_err(); + assert!(err + .to_string() + .contains("column with same name tag_0 has different id")); + + // Add a new field column. + let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone(); + column_metadatas_with_new_field_column.push(ColumnMetadata { + column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 4, + }); + let kind = AlterKind::SyncColumns { + column_metadatas: column_metadatas_with_new_field_column, + }; + kind.validate(&metadata).unwrap(); + } }