diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index a68ff9c028..4af0c28cb8 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -403,6 +403,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Catalog not found, catalog: {}", catalog))] + CatalogNotFound { + catalog: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid metadata, err: {}", err_msg))] InvalidMetadata { err_msg: String, @@ -1062,6 +1069,7 @@ impl ErrorExt for Error { ParseProcedureId { .. } | InvalidNumTopics { .. } | SchemaNotFound { .. } + | CatalogNotFound { .. } | InvalidNodeInfoKey { .. } | InvalidStatKey { .. } | ParseNum { .. } diff --git a/src/common/meta/src/reconciliation.rs b/src/common/meta/src/reconciliation.rs index 7361a75080..568e477a80 100644 --- a/src/common/meta/src/reconciliation.rs +++ b/src/common/meta/src/reconciliation.rs @@ -23,4 +23,5 @@ pub(crate) mod reconcile_table; pub(crate) mod reconcile_logical_tables; // TODO(weny): Remove it #[allow(dead_code)] +pub(crate) mod reconcile_catalog; pub(crate) mod utils; diff --git a/src/common/meta/src/reconciliation/reconcile_catalog.rs b/src/common/meta/src/reconciliation/reconcile_catalog.rs new file mode 100644 index 0000000000..758c44c6a7 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_catalog.rs @@ -0,0 +1,198 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::fmt::Debug; + +use common_procedure::error::FromJsonSnafu; +use common_procedure::{ + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, ProcedureId, + Result as ProcedureResult, Status, +}; +use futures::stream::BoxStream; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; + +use crate::cache_invalidator::CacheInvalidatorRef; +use crate::error::Result; +use crate::key::TableMetadataManagerRef; +use crate::lock_key::CatalogLock; +use crate::node_manager::NodeManagerRef; +use crate::reconciliation::reconcile_catalog::start::ReconcileCatalogStart; +use crate::reconciliation::reconcile_database::utils::wait_for_inflight_subprocedures; +use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy; +use crate::reconciliation::utils::Context; + +pub(crate) mod end; +pub(crate) mod reconcile_databases; +pub(crate) mod start; + +pub(crate) struct ReconcileCatalogContext { + pub node_manager: NodeManagerRef, + pub table_metadata_manager: TableMetadataManagerRef, + pub cache_invalidator: CacheInvalidatorRef, + persistent_ctx: PersistentContext, + volatile_ctx: VolatileContext, +} + +impl ReconcileCatalogContext { + pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self { + Self { + node_manager: ctx.node_manager, + table_metadata_manager: ctx.table_metadata_manager, + cache_invalidator: ctx.cache_invalidator, + persistent_ctx, + volatile_ctx: VolatileContext::default(), + } + } + + pub(crate) async fn wait_for_inflight_subprocedure( + &mut self, + procedure_ctx: &ProcedureContext, + ) -> Result<()> { + if let Some(procedure_id) = self.volatile_ctx.inflight_subprocedure { + wait_for_inflight_subprocedures( + procedure_ctx, + &[procedure_id], + self.persistent_ctx.fast_fail, + ) + .await?; + } + Ok(()) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct PersistentContext { + catalog: String, + fast_fail: bool, + resolve_strategy: ResolveStrategy, +} + +impl PersistentContext { + pub fn new(catalog: String, fast_fail: bool, resolve_strategy: ResolveStrategy) -> Self { + Self { + catalog, + fast_fail, + resolve_strategy, + } + } +} + +#[derive(Default)] +pub(crate) struct VolatileContext { + /// Stores the stream of catalogs. + schemas: Option>>, + /// Stores the inflight subprocedure. + inflight_subprocedure: Option, +} + +pub struct ReconcileCatalogProcedure { + pub context: ReconcileCatalogContext, + state: Box, +} + +impl ReconcileCatalogProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileCatalog"; + + pub fn new( + ctx: Context, + catalog: String, + fast_fail: bool, + resolve_strategy: ResolveStrategy, + ) -> Self { + let persistent_ctx = PersistentContext::new(catalog, fast_fail, resolve_strategy); + let context = ReconcileCatalogContext::new(ctx, persistent_ctx); + let state = Box::new(ReconcileCatalogStart); + Self { context, state } + } + + pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult { + let ProcedureDataOwned { + state, + persistent_ctx, + } = serde_json::from_str(json).context(FromJsonSnafu)?; + let context = ReconcileCatalogContext::new(ctx, persistent_ctx); + Ok(Self { context, state }) + } +} + +#[derive(Debug, Serialize)] +struct ProcedureData<'a> { + state: &'a dyn State, + persistent_ctx: &'a PersistentContext, +} + +#[derive(Debug, Deserialize)] +struct ProcedureDataOwned { + state: Box, + persistent_ctx: PersistentContext, +} + +#[async_trait::async_trait] +impl Procedure for ReconcileCatalogProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &mut self.state; + + match state.next(&mut self.context, _ctx).await { + Ok((next, status)) => { + *state = next; + Ok(status) + } + Err(e) => { + if e.is_retry_later() { + Err(ProcedureError::retry_later(e)) + } else { + Err(ProcedureError::external(e)) + } + } + } + } + + fn dump(&self) -> ProcedureResult { + let data = ProcedureData { + state: self.state.as_ref(), + persistent_ctx: &self.context.persistent_ctx, + }; + serde_json::to_string(&data).context(FromJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let catalog = &self.context.persistent_ctx.catalog; + + LockKey::new(vec![CatalogLock::Write(catalog).into()]) + } +} + +#[async_trait::async_trait] +#[typetag::serde(tag = "reconcile_catalog_state")] +pub(crate) trait State: Sync + Send + Debug { + fn name(&self) -> &'static str { + let type_name = std::any::type_name::(); + // short name + type_name.split("::").last().unwrap_or(type_name) + } + + async fn next( + &mut self, + ctx: &mut ReconcileCatalogContext, + procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)>; + + fn as_any(&self) -> &dyn Any; +} diff --git a/src/common/meta/src/reconciliation/reconcile_catalog/end.rs b/src/common/meta/src/reconciliation/reconcile_catalog/end.rs new file mode 100644 index 0000000000..e259bb38b3 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_catalog/end.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, Status}; +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State}; + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ReconcileCatalogEnd; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ReconcileCatalogEnd { + async fn next( + &mut self, + _ctx: &mut ReconcileCatalogContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + Ok((Box::new(ReconcileCatalogEnd), Status::done())) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_catalog/reconcile_databases.rs b/src/common/meta/src/reconciliation/reconcile_catalog/reconcile_databases.rs new file mode 100644 index 0000000000..cccdbc15de --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_catalog/reconcile_databases.rs @@ -0,0 +1,94 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status}; +use futures::TryStreamExt; +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::reconciliation::reconcile_catalog::end::ReconcileCatalogEnd; +use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State}; +use crate::reconciliation::reconcile_database::{ReconcileDatabaseProcedure, DEFAULT_PARALLELISM}; +use crate::reconciliation::utils::Context; + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ReconcileDatabases; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ReconcileDatabases { + async fn next( + &mut self, + ctx: &mut ReconcileCatalogContext, + procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + // Waits for inflight subprocedure first. + ctx.wait_for_inflight_subprocedure(procedure_ctx).await?; + + if ctx.volatile_ctx.schemas.as_deref().is_none() { + let schemas = ctx + .table_metadata_manager + .schema_manager() + .schema_names(&ctx.persistent_ctx.catalog); + ctx.volatile_ctx.schemas = Some(schemas); + } + + if let Some(catalog) = ctx + .volatile_ctx + .schemas + .as_mut() + .unwrap() + .try_next() + .await? + { + return Self::schedule_reconcile_database(ctx, catalog); + } + + Ok((Box::new(ReconcileCatalogEnd), Status::executing(false))) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl ReconcileDatabases { + fn schedule_reconcile_database( + ctx: &mut ReconcileCatalogContext, + schema: String, + ) -> Result<(Box, Status)> { + let context = Context { + node_manager: ctx.node_manager.clone(), + table_metadata_manager: ctx.table_metadata_manager.clone(), + cache_invalidator: ctx.cache_invalidator.clone(), + }; + let procedure = ReconcileDatabaseProcedure::new( + context, + ctx.persistent_ctx.catalog.clone(), + schema, + ctx.persistent_ctx.fast_fail, + DEFAULT_PARALLELISM, + ctx.persistent_ctx.resolve_strategy, + true, + ); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + Ok(( + Box::new(ReconcileDatabases), + Status::suspended(vec![procedure_with_id], false), + )) + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_catalog/start.rs b/src/common/meta/src/reconciliation/reconcile_catalog/start.rs new file mode 100644 index 0000000000..03fd4cda04 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_catalog/start.rs @@ -0,0 +1,58 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, Status}; +use serde::{Deserialize, Serialize}; +use snafu::ensure; + +use crate::error::{self, Result}; +use crate::key::catalog_name::CatalogNameKey; +use crate::reconciliation::reconcile_catalog::reconcile_databases::ReconcileDatabases; +use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State}; + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ReconcileCatalogStart; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ReconcileCatalogStart { + async fn next( + &mut self, + ctx: &mut ReconcileCatalogContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + let exists = ctx + .table_metadata_manager + .catalog_manager() + .exists(CatalogNameKey { + catalog: &ctx.persistent_ctx.catalog, + }) + .await?; + + ensure!( + exists, + error::CatalogNotFoundSnafu { + catalog: &ctx.persistent_ctx.catalog + }, + ); + + Ok((Box::new(ReconcileDatabases), Status::executing(true))) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_database.rs b/src/common/meta/src/reconciliation/reconcile_database.rs index 6d2af40dec..195d915cd7 100644 --- a/src/common/meta/src/reconciliation/reconcile_database.rs +++ b/src/common/meta/src/reconciliation/reconcile_database.rs @@ -45,6 +45,8 @@ use crate::reconciliation::reconcile_database::utils::wait_for_inflight_subproce use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy; use crate::reconciliation::utils::Context; +pub(crate) const DEFAULT_PARALLELISM: usize = 64; + pub(crate) struct ReconcileDatabaseContext { pub node_manager: NodeManagerRef, pub table_metadata_manager: TableMetadataManagerRef, @@ -89,6 +91,7 @@ pub(crate) struct PersistentContext { fail_fast: bool, parallelism: usize, resolve_strategy: ResolveStrategy, + is_subprocedure: bool, } impl PersistentContext { @@ -98,6 +101,7 @@ impl PersistentContext { fail_fast: bool, parallelism: usize, resolve_strategy: ResolveStrategy, + is_subprocedure: bool, ) -> Self { Self { catalog, @@ -105,6 +109,7 @@ impl PersistentContext { fail_fast, parallelism, resolve_strategy, + is_subprocedure, } } } @@ -139,9 +144,16 @@ impl ReconcileDatabaseProcedure { fail_fast: bool, parallelism: usize, resolve_strategy: ResolveStrategy, + is_subprocedure: bool, ) -> Self { - let persistent_ctx = - PersistentContext::new(catalog, schema, fail_fast, parallelism, resolve_strategy); + let persistent_ctx = PersistentContext::new( + catalog, + schema, + fail_fast, + parallelism, + resolve_strategy, + is_subprocedure, + ); let context = ReconcileDatabaseContext::new(ctx, persistent_ctx); let state = Box::new(ReconcileDatabaseStart); Self { context, state } @@ -204,6 +216,10 @@ impl Procedure for ReconcileDatabaseProcedure { fn lock_key(&self) -> LockKey { let catalog = &self.context.persistent_ctx.catalog; let schema = &self.context.persistent_ctx.schema; + // If the procedure is a subprocedure, only lock the schema. + if self.context.persistent_ctx.is_subprocedure { + return LockKey::new(vec![SchemaLock::write(catalog, schema).into()]); + } LockKey::new(vec![ CatalogLock::Read(catalog).into(), diff --git a/src/common/meta/src/reconciliation/reconcile_database/reconcile_logical_tables.rs b/src/common/meta/src/reconciliation/reconcile_database/reconcile_logical_tables.rs index 3a3a718b6e..985a8f64fa 100644 --- a/src/common/meta/src/reconciliation/reconcile_database/reconcile_logical_tables.rs +++ b/src/common/meta/src/reconciliation/reconcile_database/reconcile_logical_tables.rs @@ -28,6 +28,7 @@ use crate::error::{Result, TableInfoNotFoundSnafu}; use crate::key::table_route::TableRouteValue; use crate::reconciliation::reconcile_database::end::ReconcileDatabaseEnd; use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State}; +use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure; use crate::reconciliation::utils::Context; #[derive(Debug, Serialize, Deserialize)] @@ -201,7 +202,7 @@ impl ReconcileLogicalTables { async fn build_reconcile_logical_tables_procedure( ctx: &Context, physical_table_id: TableId, - _logical_tables: Vec<(TableId, TableName)>, + logical_tables: Vec<(TableId, TableName)>, ) -> Result { let table_info = ctx .table_metadata_manager @@ -212,8 +213,16 @@ impl ReconcileLogicalTables { table: format!("table_id: {}", physical_table_id), })?; - let _physical_table_name = table_info.table_name(); - todo!() + let physical_table_name = table_info.table_name(); + let procedure = ReconcileLogicalTablesProcedure::new( + ctx.clone(), + physical_table_id, + physical_table_name, + logical_tables, + true, + ); + + Ok(ProcedureWithId::with_random_id(Box::new(procedure))) } fn enqueue_logical_table( diff --git a/src/common/meta/src/reconciliation/utils.rs b/src/common/meta/src/reconciliation/utils.rs index 03d3c023cf..5610651913 100644 --- a/src/common/meta/src/reconciliation/utils.rs +++ b/src/common/meta/src/reconciliation/utils.rs @@ -22,14 +22,12 @@ use snafu::{ensure, OptionExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::storage::{RegionId, TableId}; use table::metadata::{RawTableInfo, RawTableMeta}; -use table::table_name::TableName; use table::table_reference::TableReference; use crate::cache_invalidator::CacheInvalidatorRef; use crate::error::{ - self, MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu, + MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu, }; -use crate::key::table_name::{TableNameKey, TableNameManager}; use crate::key::TableMetadataManagerRef; use crate::node_manager::NodeManagerRef; @@ -397,87 +395,6 @@ pub(crate) fn build_table_meta_from_column_metadatas( Ok(new_raw_table_meta) } -/// Validates the table id and name consistency. -/// -/// It will check the table id and table name consistency. -/// If the table id and table name are not consistent, it will return an error. -pub(crate) async fn validate_table_id_and_name( - table_name_manager: &TableNameManager, - table_id: TableId, - table_name: &TableName, -) -> Result<()> { - let table_name_key = TableNameKey::new( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, - ); - let table_name_value = table_name_manager - .get(table_name_key) - .await? - .with_context(|| error::TableNotFoundSnafu { - table_name: table_name.to_string(), - })?; - - ensure!( - table_name_value.table_id() == table_id, - error::UnexpectedSnafu { - err_msg: format!( - "The table id mismatch for table: {}, expected {}, actual {}", - table_name, - table_id, - table_name_value.table_id() - ), - } - ); - - Ok(()) -} - -/// Checks whether the column metadata invariants hold for the logical table. -/// -/// Invariants: -/// - Primary key (Tag) columns must exist in the new metadata. -/// - Timestamp column must remain exactly the same in name and ID. -/// -/// TODO(weny): add tests -pub(crate) fn check_column_metadatas_invariants_for_logical_table( - column_metadatas: &[ColumnMetadata], - table_info: &RawTableInfo, -) -> bool { - let new_primary_keys = column_metadatas - .iter() - .filter(|c| c.semantic_type == SemanticType::Tag) - .map(|c| c.column_schema.name.as_str()) - .collect::>(); - - let old_primary_keys = table_info - .meta - .primary_key_indices - .iter() - .map(|i| table_info.meta.schema.column_schemas[*i].name.as_str()); - - for name in old_primary_keys { - if !new_primary_keys.contains(name) { - return false; - } - } - - let old_timestamp_column_name = table_info - .meta - .schema - .column_schemas - .iter() - .find(|c| c.is_time_index()) - .map(|c| c.name.as_str()); - - let new_timestamp_column_name = column_metadatas - .iter() - .find(|c| c.semantic_type == SemanticType::Timestamp) - .map(|c| c.column_schema.name.as_str()); - - old_timestamp_column_name != new_timestamp_column_name -} - /// Returns true if the logical table info needs to be updated. /// /// The logical table only support to add columns, so we can check the length of column metadatas