diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index ad6a1de613..13f7f08a5f 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -44,6 +44,7 @@ pub mod create_flow; pub mod create_logical_tables; pub mod create_table; mod create_table_template; +pub(crate) use create_table_template::{build_template_from_raw_table_info, CreateRequestBuilder}; pub mod create_view; pub mod drop_database; pub mod drop_flow; diff --git a/src/common/meta/src/ddl/create_table_template.rs b/src/common/meta/src/ddl/create_table_template.rs index f091bf8ba2..55adc2cded 100644 --- a/src/common/meta/src/ddl/create_table_template.rs +++ b/src/common/meta/src/ddl/create_table_template.rs @@ -14,16 +14,57 @@ use std::collections::HashMap; +use api::v1::column_def::try_as_column_def; use api::v1::region::{CreateRequest, RegionColumnDef}; use api::v1::{ColumnDef, CreateTableExpr, SemanticType}; -use snafu::OptionExt; -use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; +use snafu::{OptionExt, ResultExt}; +use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME}; use store_api::storage::{RegionId, RegionNumber}; -use table::metadata::TableId; +use table::metadata::{RawTableInfo, TableId}; use crate::error::{self, Result}; use crate::wal_options_allocator::prepare_wal_options; +/// Builds a [CreateRequest] from a [RawTableInfo]. +/// +/// Note: **This method is only used for creating logical tables.** +pub(crate) fn build_template_from_raw_table_info( + raw_table_info: &RawTableInfo, +) -> Result { + let primary_key_indices = &raw_table_info.meta.primary_key_indices; + let column_defs = raw_table_info + .meta + .schema + .column_schemas + .iter() + .enumerate() + .map(|(i, c)| { + let is_primary_key = primary_key_indices.contains(&i); + let column_def = try_as_column_def(c, is_primary_key) + .context(error::ConvertColumnDefSnafu { column: &c.name })?; + + Ok(RegionColumnDef { + column_def: Some(column_def), + // The column id will be overridden by the metric engine. + // So we just use the index as the column id. + column_id: i as u32, + }) + }) + .collect::>>()?; + + let options = HashMap::from(&raw_table_info.meta.options); + let template = CreateRequest { + region_id: 0, + engine: METRIC_ENGINE_NAME.to_string(), + column_defs, + primary_key: primary_key_indices.iter().map(|i| *i as u32).collect(), + path: String::new(), + options, + }; + + Ok(template) +} + pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result { let column_defs = create_table_expr .column_defs diff --git a/src/common/meta/src/reconciliation.rs b/src/common/meta/src/reconciliation.rs index 9d0debd3ea..7361a75080 100644 --- a/src/common/meta/src/reconciliation.rs +++ b/src/common/meta/src/reconciliation.rs @@ -20,4 +20,7 @@ pub(crate) mod reconcile_database; pub(crate) mod reconcile_table; // TODO(weny): Remove it #[allow(dead_code)] +pub(crate) mod reconcile_logical_tables; +// TODO(weny): Remove it +#[allow(dead_code)] pub(crate) mod utils; diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables.rs new file mode 100644 index 0000000000..e2520f8b1a --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables.rs @@ -0,0 +1,241 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod reconcile_regions; +pub(crate) mod reconciliation_end; +pub(crate) mod reconciliation_start; +pub(crate) mod resolve_table_metadatas; +pub(crate) mod update_table_infos; + +use std::any::Any; +use std::fmt::Debug; + +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, + Result as ProcedureResult, Status, +}; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::metadata::ColumnMetadata; +use store_api::storage::TableId; +use table::metadata::RawTableInfo; +use table::table_name::TableName; + +use crate::cache_invalidator::CacheInvalidatorRef; +use crate::error::Result; +use crate::key::table_info::TableInfoValue; +use crate::key::table_route::PhysicalTableRouteValue; +use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; +use crate::node_manager::NodeManagerRef; +use crate::reconciliation::reconcile_logical_tables::reconciliation_start::ReconciliationStart; +use crate::reconciliation::utils::Context; + +pub struct ReconcileLogicalTablesContext { + pub node_manager: NodeManagerRef, + pub table_metadata_manager: TableMetadataManagerRef, + pub cache_invalidator: CacheInvalidatorRef, + pub persistent_ctx: PersistentContext, +} + +impl ReconcileLogicalTablesContext { + /// Creates a new [`ReconcileLogicalTablesContext`] with the given [`Context`] and [`PersistentContext`]. + pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self { + Self { + node_manager: ctx.node_manager, + table_metadata_manager: ctx.table_metadata_manager, + cache_invalidator: ctx.cache_invalidator, + persistent_ctx, + } + } + + pub(crate) fn table_name(&self) -> &TableName { + &self.persistent_ctx.table_name + } + + pub(crate) fn table_id(&self) -> TableId { + self.persistent_ctx.table_id + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct PersistentContext { + pub(crate) table_id: TableId, + pub(crate) table_name: TableName, + // The logical tables need to be reconciled. + // The logical tables belongs to the physical table. + pub(crate) logical_tables: Vec, + // The logical table ids. + // The value will be set in `ReconciliationStart` state. + pub(crate) logical_table_ids: Vec, + /// The table info value. + /// The value will be set in `ReconciliationStart` state. + pub(crate) table_info_value: Option>, + // The physical table route. + // The value will be set in `ReconciliationStart` state. + pub(crate) physical_table_route: Option, + // The table infos to be updated. + // The value will be set in `ResolveTableMetadatas` state. + pub(crate) update_table_infos: Vec<(TableId, Vec)>, + // The table infos to be created. + // The value will be set in `ResolveTableMetadatas` state. + pub(crate) create_tables: Vec<(TableId, RawTableInfo)>, + // Whether the procedure is a subprocedure. + pub(crate) is_subprocedure: bool, +} + +impl PersistentContext { + pub(crate) fn new( + table_id: TableId, + table_name: TableName, + logical_tables: Vec<(TableId, TableName)>, + is_subprocedure: bool, + ) -> Self { + let (logical_table_ids, logical_tables) = logical_tables.into_iter().unzip(); + + Self { + table_id, + table_name, + logical_tables, + logical_table_ids, + table_info_value: None, + physical_table_route: None, + update_table_infos: vec![], + create_tables: vec![], + is_subprocedure, + } + } +} + +pub struct ReconcileLogicalTablesProcedure { + pub context: ReconcileLogicalTablesContext, + state: Box, +} + +#[derive(Debug, Serialize)] +struct ProcedureData<'a> { + state: &'a dyn State, + persistent_ctx: &'a PersistentContext, +} + +#[derive(Debug, Deserialize)] +struct ProcedureDataOwned { + state: Box, + persistent_ctx: PersistentContext, +} + +impl ReconcileLogicalTablesProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileLogicalTables"; + + pub fn new( + ctx: Context, + table_id: TableId, + table_name: TableName, + logical_tables: Vec<(TableId, TableName)>, + is_subprocedure: bool, + ) -> Self { + let persistent_ctx = + PersistentContext::new(table_id, table_name, logical_tables, is_subprocedure); + let context = ReconcileLogicalTablesContext::new(ctx, persistent_ctx); + let state = Box::new(ReconciliationStart); + Self { context, state } + } + + pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult { + let ProcedureDataOwned { + state, + persistent_ctx, + } = serde_json::from_str(json).context(FromJsonSnafu)?; + let context = ReconcileLogicalTablesContext::new(ctx, persistent_ctx); + Ok(Self { context, state }) + } +} + +#[async_trait] +impl Procedure for ReconcileLogicalTablesProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &mut self.state; + + match state.next(&mut self.context, _ctx).await { + Ok((next, status)) => { + *state = next; + Ok(status) + } + Err(e) => { + if e.is_retry_later() { + Err(ProcedureError::retry_later(e)) + } else { + Err(ProcedureError::external(e)) + } + } + } + } + + fn dump(&self) -> ProcedureResult { + let data = ProcedureData { + state: self.state.as_ref(), + persistent_ctx: &self.context.persistent_ctx, + }; + serde_json::to_string(&data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let table_ref = &self.context.table_name().table_ref(); + + let mut table_ids = self + .context + .persistent_ctx + .logical_table_ids + .iter() + .map(|t| TableLock::Write(*t).into()) + .collect::>(); + table_ids.sort_unstable(); + table_ids.push(TableLock::Read(self.context.table_id()).into()); + if self.context.persistent_ctx.is_subprocedure { + // The catalog and schema are already locked by the parent procedure. + // Only lock the table name. + return LockKey::new(table_ids); + } + let mut keys = vec![ + CatalogLock::Read(table_ref.catalog).into(), + SchemaLock::read(table_ref.catalog, table_ref.schema).into(), + ]; + keys.extend(table_ids); + LockKey::new(keys) + } +} + +#[async_trait::async_trait] +#[typetag::serde(tag = "reconcile_logical_tables_state")] +pub(crate) trait State: Sync + Send + Debug { + fn name(&self) -> &'static str { + let type_name = std::any::type_name::(); + // short name + type_name.split("::").last().unwrap_or(type_name) + } + + async fn next( + &mut self, + ctx: &mut ReconcileLogicalTablesContext, + procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)>; + + fn as_any(&self) -> &dyn Any; +} diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs new file mode 100644 index 0000000000..98e8290425 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs @@ -0,0 +1,146 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; + +use api::v1::region::{region_request, CreateRequests, RegionRequest, RegionRequestHeader}; +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; +use common_telemetry::tracing_context::TracingContext; +use futures::future; +use serde::{Deserialize, Serialize}; +use store_api::storage::{RegionId, TableId}; +use table::metadata::RawTableInfo; + +use crate::ddl::utils::{add_peer_context_if_needed, region_storage_path}; +use crate::ddl::{build_template_from_raw_table_info, CreateRequestBuilder}; +use crate::error::Result; +use crate::reconciliation::reconcile_logical_tables::update_table_infos::UpdateTableInfos; +use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State}; +use crate::rpc::router::{find_leaders, region_distribution}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ReconcileRegions; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ReconcileRegions { + async fn next( + &mut self, + ctx: &mut ReconcileLogicalTablesContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + if ctx.persistent_ctx.create_tables.is_empty() { + return Ok((Box::new(UpdateTableInfos), Status::executing(false))); + } + + // Safety: previous steps ensure the physical table route is set. + let region_routes = &ctx + .persistent_ctx + .physical_table_route + .as_ref() + .unwrap() + .region_routes; + + let region_distribution = region_distribution(region_routes); + let leaders = find_leaders(region_routes) + .into_iter() + .map(|p| (p.id, p)) + .collect::>(); + let mut create_table_tasks = Vec::with_capacity(leaders.len()); + for (datanode_id, region_role_set) in region_distribution { + if region_role_set.leader_regions.is_empty() { + continue; + } + // Safety: It contains all leaders in the region routes. + let peer = leaders.get(&datanode_id).unwrap().clone(); + let request = self.make_request(®ion_role_set.leader_regions, ctx)?; + let requester = ctx.node_manager.datanode(&peer).await; + create_table_tasks.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(peer)) + }); + } + + future::join_all(create_table_tasks) + .await + .into_iter() + .collect::>>()?; + let table_id = ctx.table_id(); + let table_name = ctx.table_name(); + info!( + "Reconciled regions for logical tables: {:?}, physical table: {}, table_id: {}", + ctx.persistent_ctx + .create_tables + .iter() + .map(|(table_id, _)| table_id) + .collect::>(), + table_id, + table_name + ); + ctx.persistent_ctx.create_tables.clear(); + return Ok((Box::new(UpdateTableInfos), Status::executing(true))); + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl ReconcileRegions { + fn make_request( + &self, + region_numbers: &[u32], + ctx: &ReconcileLogicalTablesContext, + ) -> Result { + let physical_table_id = ctx.table_id(); + let table_name = ctx.table_name(); + let create_tables = &ctx.persistent_ctx.create_tables; + + let mut requests = Vec::with_capacity(region_numbers.len() * create_tables.len()); + for (table_id, table_info) in create_tables { + let request_builder = + create_region_request_from_raw_table_info(table_info, physical_table_id)?; + let storage_path = + region_storage_path(&table_name.catalog_name, &table_name.schema_name); + + for region_number in region_numbers { + let region_id = RegionId::new(*table_id, *region_number); + let one_region_request = + request_builder.build_one(region_id, storage_path.clone(), &HashMap::new()); + requests.push(one_region_request); + } + } + + Ok(RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::Creates(CreateRequests { requests })), + }) + } +} + +/// Creates a region request builder from a raw table info. +fn create_region_request_from_raw_table_info( + raw_table_info: &RawTableInfo, + physical_table_id: TableId, +) -> Result { + let template = build_template_from_raw_table_info(raw_table_info)?; + Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) +} diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_end.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_end.rs new file mode 100644 index 0000000000..09beaa702b --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_end.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, Status}; +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ReconciliationEnd; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ReconciliationEnd { + async fn next( + &mut self, + _ctx: &mut ReconcileLogicalTablesContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + Ok((Box::new(ReconciliationEnd), Status::done())) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_start.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_start.rs new file mode 100644 index 0000000000..b74cec603a --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_start.rs @@ -0,0 +1,171 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt}; +use store_api::storage::TableId; +use table::table_name::TableName; + +use crate::ddl::utils::region_metadata_lister::RegionMetadataLister; +use crate::ddl::utils::table_id::get_all_table_ids_by_names; +use crate::ddl::utils::table_info::all_logical_table_routes_have_same_physical_id; +use crate::error::{self, Result}; +use crate::reconciliation::reconcile_logical_tables::resolve_table_metadatas::ResolveTableMetadatas; +use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State}; +use crate::reconciliation::utils::check_column_metadatas_consistent; + +/// The start state of the reconciliation procedure. +#[derive(Debug, Serialize, Deserialize)] +pub struct ReconciliationStart; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ReconciliationStart { + async fn next( + &mut self, + ctx: &mut ReconcileLogicalTablesContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + let table_id = ctx.table_id(); + let table_name = ctx.table_name(); + let (physical_table_id, physical_table_route) = ctx + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) + .await?; + ensure!( + physical_table_id == table_id, + error::UnexpectedSnafu { + err_msg: format!( + "Expected physical table: {}, but it's a logical table of table: {}", + table_name, physical_table_id + ), + } + ); + + info!( + "Starting reconciliation for logical table: table_id: {}, table_name: {}", + table_id, table_name + ); + + let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone()); + let region_metadatas = region_metadata_lister + .list(physical_table_id, &physical_table_route.region_routes) + .await?; + + ensure!( + !region_metadatas.is_empty(), + error::UnexpectedSnafu { + err_msg: format!( + "No region metadata found for table: {}, table_id: {}", + table_name, table_id + ), + } + ); + + if region_metadatas.iter().any(|r| r.is_none()) { + return error::UnexpectedSnafu { + err_msg: format!( + "Some regions of the physical table are not open. Table: {}, table_id: {}", + table_name, table_id + ), + } + .fail(); + } + + // Safety: checked above + let region_metadatas = region_metadatas + .into_iter() + .map(|r| r.unwrap()) + .collect::>(); + let _region_metadata = check_column_metadatas_consistent(®ion_metadatas).context( + error::UnexpectedSnafu { + err_msg: format!( + "Column metadatas are not consistent for table: {}, table_id: {}", + table_name, table_id + ), + }, + )?; + + // TODO(weny): ensure all columns in region metadata can be found in table info. + + // Validates the logical tables. + Self::validate_schema(&ctx.persistent_ctx.logical_tables)?; + let table_refs = ctx + .persistent_ctx + .logical_tables + .iter() + .map(|t| t.table_ref()) + .collect::>(); + let table_ids = get_all_table_ids_by_names( + ctx.table_metadata_manager.table_name_manager(), + &table_refs, + ) + .await?; + Self::validate_logical_table_routes(ctx, &table_ids).await?; + + ctx.persistent_ctx.physical_table_route = Some(physical_table_route); + ctx.persistent_ctx.logical_table_ids = table_ids; + Ok((Box::new(ResolveTableMetadatas), Status::executing(true))) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl ReconciliationStart { + /// Validates all the logical tables have the same catalog and schema. + fn validate_schema(logical_tables: &[TableName]) -> Result<()> { + let is_same_schema = logical_tables.windows(2).all(|pair| { + pair[0].catalog_name == pair[1].catalog_name + && pair[0].schema_name == pair[1].schema_name + }); + + ensure!( + is_same_schema, + error::UnexpectedSnafu { + err_msg: "The logical tables have different schemas", + } + ); + + Ok(()) + } + + async fn validate_logical_table_routes( + ctx: &mut ReconcileLogicalTablesContext, + table_ids: &[TableId], + ) -> Result<()> { + let all_logical_table_routes_have_same_physical_id = + all_logical_table_routes_have_same_physical_id( + ctx.table_metadata_manager.table_route_manager(), + table_ids, + ctx.table_id(), + ) + .await?; + + ensure!( + all_logical_table_routes_have_same_physical_id, + error::UnexpectedSnafu { + err_msg: "All the logical tables should have the same physical table id", + } + ); + + Ok(()) + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/resolve_table_metadatas.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/resolve_table_metadatas.rs new file mode 100644 index 0000000000..3504337a90 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/resolve_table_metadatas.rs @@ -0,0 +1,130 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::{info, warn}; +use serde::{Deserialize, Serialize}; +use snafu::ensure; + +use crate::ddl::utils::region_metadata_lister::RegionMetadataLister; +use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids; +use crate::error::{self, Result}; +use crate::reconciliation::reconcile_logical_tables::reconcile_regions::ReconcileRegions; +use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State}; +use crate::reconciliation::utils::{ + check_column_metadatas_consistent, need_update_logical_table_info, +}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ResolveTableMetadatas; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ResolveTableMetadatas { + async fn next( + &mut self, + ctx: &mut ReconcileLogicalTablesContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + let table_names = ctx + .persistent_ctx + .logical_tables + .iter() + .map(|t| t.table_ref()) + .collect::>(); + let table_ids = &ctx.persistent_ctx.logical_table_ids; + + let mut create_tables = vec![]; + let mut update_table_infos = vec![]; + + let table_info_values = get_all_table_info_values_by_table_ids( + ctx.table_metadata_manager.table_info_manager(), + table_ids, + &table_names, + ) + .await?; + + // Safety: The physical table route is set in `ReconciliationStart` state. + let region_routes = &ctx + .persistent_ctx + .physical_table_route + .as_ref() + .unwrap() + .region_routes; + let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone()); + for (table_id, table_info_value) in table_ids.iter().zip(table_info_values.iter()) { + let region_metadatas = region_metadata_lister + .list(*table_id, region_routes) + .await?; + + ensure!( + !region_metadatas.is_empty(), + error::UnexpectedSnafu { + err_msg: format!( + "No region metadata found for table: {}, table_id: {}", + table_info_value.table_info.name, table_id + ), + } + ); + + if region_metadatas.iter().any(|r| r.is_none()) { + create_tables.push((*table_id, table_info_value.table_info.clone())); + continue; + } + + // Safety: The physical table route is set in `ReconciliationStart` state. + let region_metadatas = region_metadatas + .into_iter() + .map(|r| r.unwrap()) + .collect::>(); + if let Some(column_metadatas) = check_column_metadatas_consistent(®ion_metadatas) { + if need_update_logical_table_info(&table_info_value.table_info, &column_metadatas) { + update_table_infos.push((*table_id, column_metadatas)); + } + } else { + // If the logical regions have inconsistent column metadatas, it won't affect read and write. + // It's safe to continue if the column metadatas of the logical table are inconsistent. + warn!( + "Found inconsistent column metadatas for table: {}, table_id: {}. Remaining the inconsistent column metadatas", + table_info_value.table_info.name, table_id + ); + } + } + + let table_id = ctx.table_id(); + let table_name = ctx.table_name(); + info!( + "Resolving table metadatas for physical table: {}, table_id: {}, updating table infos: {:?}, creating tables: {:?}", + table_name, + table_id, + update_table_infos + .iter() + .map(|(table_id, _)| *table_id) + .collect::>(), + create_tables + .iter() + .map(|(table_id, _)| *table_id) + .collect::>(), + ); + ctx.persistent_ctx.update_table_infos = update_table_infos; + ctx.persistent_ctx.create_tables = create_tables; + Ok((Box::new(ReconcileRegions), Status::executing(true))) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/update_table_infos.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/update_table_infos.rs new file mode 100644 index 0000000000..fd7c276f8b --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/update_table_infos.rs @@ -0,0 +1,178 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; + +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; +use serde::{Deserialize, Serialize}; +use store_api::metadata::ColumnMetadata; +use store_api::storage::TableId; +use table::metadata::RawTableInfo; +use table::table_name::TableName; +use table::table_reference::TableReference; + +use crate::cache_invalidator::Context as CacheContext; +use crate::ddl::utils::table_info::{ + batch_update_table_info_values, get_all_table_info_values_by_table_ids, +}; +use crate::error::Result; +use crate::instruction::CacheIdent; +use crate::reconciliation::reconcile_logical_tables::reconciliation_end::ReconciliationEnd; +use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State}; +use crate::reconciliation::utils::build_table_meta_from_column_metadatas; + +#[derive(Debug, Serialize, Deserialize)] +pub struct UpdateTableInfos; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for UpdateTableInfos { + async fn next( + &mut self, + ctx: &mut ReconcileLogicalTablesContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + if ctx.persistent_ctx.update_table_infos.is_empty() { + return Ok((Box::new(ReconciliationEnd), Status::executing(false))); + } + + let all_table_names = ctx + .persistent_ctx + .logical_table_ids + .iter() + .cloned() + .zip( + ctx.persistent_ctx + .logical_tables + .iter() + .map(|t| t.table_ref()), + ) + .collect::>(); + let table_ids = ctx + .persistent_ctx + .update_table_infos + .iter() + .map(|(table_id, _)| *table_id) + .collect::>(); + let table_names = table_ids + .iter() + .map(|table_id| *all_table_names.get(table_id).unwrap()) + .collect::>(); + let table_info_values = get_all_table_info_values_by_table_ids( + ctx.table_metadata_manager.table_info_manager(), + &table_ids, + &table_names, + ) + .await?; + + let mut table_info_values_to_update = + Vec::with_capacity(ctx.persistent_ctx.update_table_infos.len()); + for ((table_id, column_metadatas), table_info_value) in ctx + .persistent_ctx + .update_table_infos + .iter() + .zip(table_info_values.into_iter()) + { + let new_table_info = Self::build_new_table_info( + *table_id, + column_metadatas, + &table_info_value.table_info, + )?; + table_info_values_to_update.push((table_info_value, new_table_info)); + } + let table_id = ctx.table_id(); + let table_name = ctx.table_name(); + + batch_update_table_info_values(&ctx.table_metadata_manager, table_info_values_to_update) + .await?; + + info!( + "Updated table infos for logical tables: {:?}, physical table: {}, table_id: {}", + ctx.persistent_ctx + .update_table_infos + .iter() + .map(|(table_id, _)| table_id) + .collect::>(), + table_id, + table_name, + ); + + let cache_ctx = CacheContext { + subject: Some(format!( + "Invalidate table by reconcile logical tables, physical_table_id: {}", + table_id + )), + }; + let idents = Self::build_cache_ident_keys(table_id, table_name, &table_ids, &table_names); + ctx.cache_invalidator + .invalidate(&cache_ctx, &idents) + .await?; + + ctx.persistent_ctx.update_table_infos.clear(); + Ok((Box::new(ReconciliationEnd), Status::executing(false))) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl UpdateTableInfos { + fn build_new_table_info( + table_id: TableId, + column_metadatas: &[ColumnMetadata], + table_info: &RawTableInfo, + ) -> Result { + let table_ref = table_info.table_ref(); + let table_meta = build_table_meta_from_column_metadatas( + table_id, + table_ref, + &table_info.meta, + None, + column_metadatas, + )?; + + let mut new_table_info = table_info.clone(); + new_table_info.meta = table_meta; + new_table_info.ident.version = table_info.ident.version + 1; + new_table_info.sort_columns(); + + Ok(new_table_info) + } + + fn build_cache_ident_keys( + physical_table_id: TableId, + physical_table_name: &TableName, + table_ids: &[TableId], + table_names: &[TableReference], + ) -> Vec { + let mut cache_keys = Vec::with_capacity(table_ids.len() * 2 + 2); + cache_keys.push(CacheIdent::TableId(physical_table_id)); + cache_keys.push(CacheIdent::TableName(physical_table_name.clone())); + cache_keys.extend( + table_ids + .iter() + .map(|table_id| CacheIdent::TableId(*table_id)), + ); + cache_keys.extend( + table_names + .iter() + .map(|table_ref| CacheIdent::TableName((*table_ref).into())), + ); + + cache_keys + } +} diff --git a/src/common/meta/src/reconciliation/utils.rs b/src/common/meta/src/reconciliation/utils.rs index 1a28b61316..03d3c023cf 100644 --- a/src/common/meta/src/reconciliation/utils.rs +++ b/src/common/meta/src/reconciliation/utils.rs @@ -21,7 +21,7 @@ use datatypes::schema::ColumnSchema; use snafu::{ensure, OptionExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::storage::{RegionId, TableId}; -use table::metadata::RawTableMeta; +use table::metadata::{RawTableInfo, RawTableMeta}; use table::table_name::TableName; use table::table_reference::TableReference; @@ -433,6 +433,62 @@ pub(crate) async fn validate_table_id_and_name( Ok(()) } +/// Checks whether the column metadata invariants hold for the logical table. +/// +/// Invariants: +/// - Primary key (Tag) columns must exist in the new metadata. +/// - Timestamp column must remain exactly the same in name and ID. +/// +/// TODO(weny): add tests +pub(crate) fn check_column_metadatas_invariants_for_logical_table( + column_metadatas: &[ColumnMetadata], + table_info: &RawTableInfo, +) -> bool { + let new_primary_keys = column_metadatas + .iter() + .filter(|c| c.semantic_type == SemanticType::Tag) + .map(|c| c.column_schema.name.as_str()) + .collect::>(); + + let old_primary_keys = table_info + .meta + .primary_key_indices + .iter() + .map(|i| table_info.meta.schema.column_schemas[*i].name.as_str()); + + for name in old_primary_keys { + if !new_primary_keys.contains(name) { + return false; + } + } + + let old_timestamp_column_name = table_info + .meta + .schema + .column_schemas + .iter() + .find(|c| c.is_time_index()) + .map(|c| c.name.as_str()); + + let new_timestamp_column_name = column_metadatas + .iter() + .find(|c| c.semantic_type == SemanticType::Timestamp) + .map(|c| c.column_schema.name.as_str()); + + old_timestamp_column_name != new_timestamp_column_name +} + +/// Returns true if the logical table info needs to be updated. +/// +/// The logical table only support to add columns, so we can check the length of column metadatas +/// to determine whether the logical table info needs to be updated. +pub(crate) fn need_update_logical_table_info( + table_info: &RawTableInfo, + column_metadatas: &[ColumnMetadata], +) -> bool { + table_info.meta.schema.column_schemas.len() != column_metadatas.len() +} + #[derive(Clone)] pub struct Context { pub node_manager: NodeManagerRef, diff --git a/src/common/meta/src/snapshot.rs b/src/common/meta/src/snapshot.rs index e0942dde5f..08aebbc99c 100644 --- a/src/common/meta/src/snapshot.rs +++ b/src/common/meta/src/snapshot.rs @@ -165,7 +165,7 @@ const MAX_REQUEST_SIZE: usize = 1024 * 1024; /// Returns true if the key is an internal key. fn is_internal_key(kv: &FileKeyValue) -> bool { - kv.key == ELECTION_KEY.as_bytes() || kv.key == CANDIDATES_ROOT.as_bytes() + kv.key.starts_with(ELECTION_KEY.as_bytes()) || kv.key.starts_with(CANDIDATES_ROOT.as_bytes()) } impl MetadataSnapshotManager { diff --git a/src/datatypes/src/schema/constraint.rs b/src/datatypes/src/schema/constraint.rs index c6551687a4..e7bd5d2063 100644 --- a/src/datatypes/src/schema/constraint.rs +++ b/src/datatypes/src/schema/constraint.rs @@ -57,6 +57,15 @@ impl TryFrom for Vec { } } +impl TryFrom<&ColumnDefaultConstraint> for Vec { + type Error = error::Error; + + fn try_from(value: &ColumnDefaultConstraint) -> std::result::Result { + let s = serde_json::to_string(value).context(error::SerializeSnafu)?; + Ok(s.into_bytes()) + } +} + impl Display for ColumnDefaultConstraint { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self {