From d3a1c80fbde74e0b76cfd83a9720cbfcf84dff59 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 31 Jul 2025 19:01:56 +0800 Subject: [PATCH] feat: introduce reconcile database procedure (#6612) * feat: introduce reconcile database procedure Signed-off-by: WenyXu * feat: hold the schema lock Signed-off-by: WenyXu * chore: add todo Signed-off-by: WenyXu * chore: update comments Signed-off-by: WenyXu * chore: rename to `fast_fail` Signed-off-by: WenyXu * chore: add logs Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 1 + src/common/meta/src/error.rs | 18 ++ src/common/meta/src/key/table_route.rs | 11 + src/common/meta/src/reconciliation.rs | 3 + .../src/reconciliation/reconcile_database.rs | 231 +++++++++++++++++ .../reconciliation/reconcile_database/end.rs | 40 +++ .../reconcile_logical_tables.rs | 234 ++++++++++++++++++ .../reconcile_database/reconcile_tables.rs | 164 ++++++++++++ .../reconcile_database/start.rs | 63 +++++ .../reconcile_database/utils.rs | 79 ++++++ .../src/reconciliation/reconcile_table.rs | 19 +- src/common/procedure-test/Cargo.toml | 1 + src/common/procedure-test/src/lib.rs | 8 + src/common/procedure/src/local.rs | 15 ++ src/common/procedure/src/local/runner.rs | 8 + src/common/procedure/src/procedure.rs | 14 ++ 16 files changed, 908 insertions(+), 1 deletion(-) create mode 100644 src/common/meta/src/reconciliation/reconcile_database.rs create mode 100644 src/common/meta/src/reconciliation/reconcile_database/end.rs create mode 100644 src/common/meta/src/reconciliation/reconcile_database/reconcile_logical_tables.rs create mode 100644 src/common/meta/src/reconciliation/reconcile_database/reconcile_tables.rs create mode 100644 src/common/meta/src/reconciliation/reconcile_database/start.rs create mode 100644 src/common/meta/src/reconciliation/reconcile_database/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 8da67fdb60..021ae86a86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2593,6 +2593,7 @@ dependencies = [ "async-trait", "common-procedure", "snafu 0.8.5", + "tokio", ] [[package]] diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index ab763dadcb..a68ff9c028 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use common_procedure::ProcedureId; use common_wal::options::WalOptions; use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; @@ -140,6 +141,21 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))] + ProcedureStateReceiver { + procedure_id: ProcedureId, + #[snafu(implicit)] + location: Location, + source: common_procedure::Error, + }, + + #[snafu(display("Procedure state receiver not found: {procedure_id}"))] + ProcedureStateReceiverNotFound { + procedure_id: ProcedureId, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to wait procedure done"))] WaitProcedure { #[snafu(implicit)] @@ -952,6 +968,7 @@ impl ErrorExt for Error { NoLeader { .. } => StatusCode::TableUnavailable, ValueNotExist { .. } | ProcedurePoisonConflict { .. } + | ProcedureStateReceiverNotFound { .. } | MissingColumnIds { .. } | MissingColumnInColumnMetadata { .. } | MismatchColumnId { .. } @@ -1040,6 +1057,7 @@ impl ErrorExt for Error { ConvertAlterTableRequest { source, .. } => source.status_code(), PutPoison { source, .. } => source.status_code(), ConvertColumnDef { source, .. } => source.status_code(), + ProcedureStateReceiver { source, .. } => source.status_code(), ParseProcedureId { .. } | InvalidNumTopics { .. } diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 94d2a0bf07..27097d52c2 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -184,6 +184,17 @@ impl TableRouteValue { } } + /// Converts to [`LogicalTableRouteValue`]. + /// + /// # Panic + /// If it is not the [`LogicalTableRouteValue`]. + pub fn into_logical_table_route(self) -> LogicalTableRouteValue { + match self { + TableRouteValue::Logical(x) => x, + _ => unreachable!("Mistakenly been treated as a Logical TableRoute: {self:?}"), + } + } + pub fn region_numbers(&self) -> Vec { match self { TableRouteValue::Physical(x) => x diff --git a/src/common/meta/src/reconciliation.rs b/src/common/meta/src/reconciliation.rs index 8332a3a90f..9d0debd3ea 100644 --- a/src/common/meta/src/reconciliation.rs +++ b/src/common/meta/src/reconciliation.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +// TODO(weny): Remove it +#[allow(dead_code)] +pub(crate) mod reconcile_database; // TODO(weny): Remove it #[allow(dead_code)] pub(crate) mod reconcile_table; diff --git a/src/common/meta/src/reconciliation/reconcile_database.rs b/src/common/meta/src/reconciliation/reconcile_database.rs new file mode 100644 index 0000000000..6d2af40dec --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_database.rs @@ -0,0 +1,231 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod end; +pub(crate) mod reconcile_logical_tables; +pub(crate) mod reconcile_tables; +pub(crate) mod start; +pub(crate) mod utils; + +use std::any::Any; +use std::collections::HashMap; +use std::fmt::Debug; + +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, ProcedureId, + Result as ProcedureResult, Status, +}; +use futures::stream::BoxStream; +use serde::{Deserialize, Serialize}; +use snafu::ResultExt; +use store_api::storage::TableId; +use table::table_name::TableName; + +use crate::cache_invalidator::CacheInvalidatorRef; +use crate::error::Result; +use crate::key::table_name::TableNameValue; +use crate::key::TableMetadataManagerRef; +use crate::lock_key::{CatalogLock, SchemaLock}; +use crate::node_manager::NodeManagerRef; +use crate::reconciliation::reconcile_database::start::ReconcileDatabaseStart; +use crate::reconciliation::reconcile_database::utils::wait_for_inflight_subprocedures; +use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy; +use crate::reconciliation::utils::Context; + +pub(crate) struct ReconcileDatabaseContext { + pub node_manager: NodeManagerRef, + pub table_metadata_manager: TableMetadataManagerRef, + pub cache_invalidator: CacheInvalidatorRef, + persistent_ctx: PersistentContext, + volatile_ctx: VolatileContext, +} + +impl ReconcileDatabaseContext { + pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self { + Self { + node_manager: ctx.node_manager, + table_metadata_manager: ctx.table_metadata_manager, + cache_invalidator: ctx.cache_invalidator, + persistent_ctx, + volatile_ctx: VolatileContext::default(), + } + } + + pub(crate) async fn wait_for_inflight_subprocedures( + &mut self, + procedure_ctx: &ProcedureContext, + ) -> Result<()> { + if !self.volatile_ctx.inflight_subprocedures.is_empty() { + wait_for_inflight_subprocedures( + procedure_ctx, + &self.volatile_ctx.inflight_subprocedures, + self.persistent_ctx.fail_fast, + ) + .await?; + self.volatile_ctx.inflight_subprocedures.clear(); + } + + Ok(()) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct PersistentContext { + catalog: String, + schema: String, + fail_fast: bool, + parallelism: usize, + resolve_strategy: ResolveStrategy, +} + +impl PersistentContext { + pub fn new( + catalog: String, + schema: String, + fail_fast: bool, + parallelism: usize, + resolve_strategy: ResolveStrategy, + ) -> Self { + Self { + catalog, + schema, + fail_fast, + parallelism, + resolve_strategy, + } + } +} + +#[derive(Default)] +pub(crate) struct VolatileContext { + /// Stores pending physical tables. + pending_tables: Vec<(TableId, TableName)>, + /// Stores pending logical tables associated with each physical table. + /// + /// - Key: Table ID of the physical table. + /// - Value: Vector of (TableId, TableName) tuples representing logical tables belonging to the physical table. + pending_logical_tables: HashMap>, + /// Stores inflight subprocedures. + inflight_subprocedures: Vec, + /// Stores the stream of tables. + tables: Option>>, +} + +pub struct ReconcileDatabaseProcedure { + pub context: ReconcileDatabaseContext, + state: Box, +} + +impl ReconcileDatabaseProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileDatabase"; + + pub fn new( + ctx: Context, + catalog: String, + schema: String, + fail_fast: bool, + parallelism: usize, + resolve_strategy: ResolveStrategy, + ) -> Self { + let persistent_ctx = + PersistentContext::new(catalog, schema, fail_fast, parallelism, resolve_strategy); + let context = ReconcileDatabaseContext::new(ctx, persistent_ctx); + let state = Box::new(ReconcileDatabaseStart); + Self { context, state } + } + + pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult { + let ProcedureDataOwned { + state, + persistent_ctx, + } = serde_json::from_str(json).context(FromJsonSnafu)?; + let context = ReconcileDatabaseContext::new(ctx, persistent_ctx); + Ok(Self { context, state }) + } +} + +#[derive(Debug, Serialize)] +struct ProcedureData<'a> { + state: &'a dyn State, + persistent_ctx: &'a PersistentContext, +} + +#[derive(Debug, Deserialize)] +struct ProcedureDataOwned { + state: Box, + persistent_ctx: PersistentContext, +} + +#[async_trait] +impl Procedure for ReconcileDatabaseProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &mut self.state; + + match state.next(&mut self.context, _ctx).await { + Ok((next, status)) => { + *state = next; + Ok(status) + } + Err(e) => { + if e.is_retry_later() { + Err(ProcedureError::retry_later(e)) + } else { + Err(ProcedureError::external(e)) + } + } + } + } + + fn dump(&self) -> ProcedureResult { + let data = ProcedureData { + state: self.state.as_ref(), + persistent_ctx: &self.context.persistent_ctx, + }; + serde_json::to_string(&data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let catalog = &self.context.persistent_ctx.catalog; + let schema = &self.context.persistent_ctx.schema; + + LockKey::new(vec![ + CatalogLock::Read(catalog).into(), + SchemaLock::write(catalog, schema).into(), + ]) + } +} + +#[async_trait::async_trait] +#[typetag::serde(tag = "reconcile_database_state")] +pub(crate) trait State: Sync + Send + Debug { + fn name(&self) -> &'static str { + let type_name = std::any::type_name::(); + // short name + type_name.split("::").last().unwrap_or(type_name) + } + + async fn next( + &mut self, + ctx: &mut ReconcileDatabaseContext, + procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)>; + + fn as_any(&self) -> &dyn Any; +} diff --git a/src/common/meta/src/reconciliation/reconcile_database/end.rs b/src/common/meta/src/reconciliation/reconcile_database/end.rs new file mode 100644 index 0000000000..9d9bd75fc9 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_database/end.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, Status}; +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State}; + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ReconcileDatabaseEnd; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ReconcileDatabaseEnd { + async fn next( + &mut self, + _ctx: &mut ReconcileDatabaseContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + Ok((Box::new(ReconcileDatabaseEnd), Status::done())) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_database/reconcile_logical_tables.rs b/src/common/meta/src/reconciliation/reconcile_database/reconcile_logical_tables.rs new file mode 100644 index 0000000000..3a3a718b6e --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_database/reconcile_logical_tables.rs @@ -0,0 +1,234 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; + +use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status}; +use common_telemetry::info; +use futures::TryStreamExt; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; +use table::metadata::TableId; +use table::table_name::TableName; +use table::table_reference::TableReference; + +use crate::error::{Result, TableInfoNotFoundSnafu}; +use crate::key::table_route::TableRouteValue; +use crate::reconciliation::reconcile_database::end::ReconcileDatabaseEnd; +use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State}; +use crate::reconciliation::utils::Context; + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ReconcileLogicalTables; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ReconcileLogicalTables { + async fn next( + &mut self, + ctx: &mut ReconcileDatabaseContext, + procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + info!( + "Reconcile logical tables in database: {}, catalog: {}, inflight_subprocedures: {}", + ctx.persistent_ctx.schema, + ctx.persistent_ctx.catalog, + ctx.volatile_ctx.inflight_subprocedures.len() + ); + // Waits for inflight subprocedures first. + ctx.wait_for_inflight_subprocedures(procedure_ctx).await?; + + let catalog = &ctx.persistent_ctx.catalog; + let schema = &ctx.persistent_ctx.schema; + let parallelism = ctx.persistent_ctx.parallelism; + if ctx.volatile_ctx.tables.as_deref().is_none() { + let tables = ctx + .table_metadata_manager + .table_name_manager() + .tables(catalog, schema); + ctx.volatile_ctx.tables = Some(tables); + } + + let pending_logical_tables = &mut ctx.volatile_ctx.pending_logical_tables; + let mut pending_procedures = Vec::with_capacity(parallelism); + let context = Context { + node_manager: ctx.node_manager.clone(), + table_metadata_manager: ctx.table_metadata_manager.clone(), + cache_invalidator: ctx.cache_invalidator.clone(), + }; + // Safety: initialized above. + while let Some((table_name, table_name_value)) = + ctx.volatile_ctx.tables.as_mut().unwrap().try_next().await? + { + let table_id = table_name_value.table_id(); + let Some(table_route) = ctx + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .get(table_id) + .await? + else { + continue; + }; + + let table_ref = TableReference::full(catalog, schema, &table_name); + Self::enqueue_logical_table(pending_logical_tables, table_id, table_ref, table_route); + // Try to build reconcile logical tables procedure. + if let Some(procedure) = Self::try_build_reconcile_logical_tables_procedure( + &context, + pending_logical_tables, + parallelism, + ) + .await? + { + pending_procedures.push(procedure); + } + // Schedule reconcile logical tables procedures if the number of pending procedures + // is greater than or equal to parallelism. + if Self::should_schedule_reconcile_logical_tables(&pending_procedures, parallelism) { + return Self::schedule_reconcile_logical_tables(ctx, &mut pending_procedures); + } + } + + // Build remaining procedures. + Self::build_remaining_procedures( + &context, + pending_logical_tables, + &mut pending_procedures, + parallelism, + ) + .await?; + // If there are remaining procedures, schedule reconcile logical tables procedures. + if !pending_procedures.is_empty() { + return Self::schedule_reconcile_logical_tables(ctx, &mut pending_procedures); + } + + ctx.volatile_ctx.tables.take(); + Ok((Box::new(ReconcileDatabaseEnd), Status::executing(true))) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl ReconcileLogicalTables { + fn schedule_reconcile_logical_tables( + ctx: &mut ReconcileDatabaseContext, + buffer: &mut Vec, + ) -> Result<(Box, Status)> { + let procedures = std::mem::take(buffer); + ctx.volatile_ctx + .inflight_subprocedures + .extend(procedures.iter().map(|p| p.id)); + + Ok(( + Box::new(ReconcileLogicalTables), + Status::suspended(procedures, false), + )) + } + + fn should_schedule_reconcile_logical_tables( + buffer: &[ProcedureWithId], + parallelism: usize, + ) -> bool { + buffer.len() >= parallelism + } + + async fn try_build_reconcile_logical_tables_procedure( + ctx: &Context, + pending_logical_tables: &mut HashMap>, + parallelism: usize, + ) -> Result> { + let mut physical_table_id = None; + for (table_id, tables) in pending_logical_tables.iter() { + if tables.len() >= parallelism { + physical_table_id = Some(*table_id); + break; + } + } + + if let Some(physical_table_id) = physical_table_id { + // Safety: Checked above. + let tables = pending_logical_tables.remove(&physical_table_id).unwrap(); + return Ok(Some( + Self::build_reconcile_logical_tables_procedure(ctx, physical_table_id, tables) + .await?, + )); + } + + Ok(None) + } + + async fn build_remaining_procedures( + ctx: &Context, + pending_logical_tables: &mut HashMap>, + pending_procedures: &mut Vec, + parallelism: usize, + ) -> Result<()> { + if pending_logical_tables.is_empty() { + return Ok(()); + } + + while let Some(physical_table_id) = pending_logical_tables.keys().next().cloned() { + if pending_procedures.len() >= parallelism { + return Ok(()); + } + + // Safety: Checked above. + let tables = pending_logical_tables.remove(&physical_table_id).unwrap(); + pending_procedures.push( + Self::build_reconcile_logical_tables_procedure(ctx, physical_table_id, tables) + .await?, + ); + } + + Ok(()) + } + + async fn build_reconcile_logical_tables_procedure( + ctx: &Context, + physical_table_id: TableId, + _logical_tables: Vec<(TableId, TableName)>, + ) -> Result { + let table_info = ctx + .table_metadata_manager + .table_info_manager() + .get(physical_table_id) + .await? + .context(TableInfoNotFoundSnafu { + table: format!("table_id: {}", physical_table_id), + })?; + + let _physical_table_name = table_info.table_name(); + todo!() + } + + fn enqueue_logical_table( + tables: &mut HashMap>, + table_id: TableId, + table_ref: TableReference<'_>, + table_route: TableRouteValue, + ) { + if !table_route.is_physical() { + let logical_table_route = table_route.into_logical_table_route(); + let physical_table_id = logical_table_route.physical_table_id(); + tables + .entry(physical_table_id) + .or_default() + .push((table_id, table_ref.into())); + } + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_database/reconcile_tables.rs b/src/common/meta/src/reconciliation/reconcile_database/reconcile_tables.rs new file mode 100644 index 0000000000..fa26b5ecb5 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_database/reconcile_tables.rs @@ -0,0 +1,164 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status}; +use common_telemetry::info; +use futures::TryStreamExt; +use serde::{Deserialize, Serialize}; +use store_api::storage::TableId; +use table::table_name::TableName; +use table::table_reference::TableReference; + +use crate::error::Result; +use crate::key::table_route::TableRouteValue; +use crate::reconciliation::reconcile_database::reconcile_logical_tables::ReconcileLogicalTables; +use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State}; +use crate::reconciliation::reconcile_table::ReconcileTableProcedure; +use crate::reconciliation::utils::Context; + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ReconcileTables; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ReconcileTables { + async fn next( + &mut self, + ctx: &mut ReconcileDatabaseContext, + procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + info!( + "Reconcile tables in database: {}, catalog: {}, inflight_subprocedures: {}", + ctx.persistent_ctx.schema, + ctx.persistent_ctx.catalog, + ctx.volatile_ctx.inflight_subprocedures.len() + ); + // Waits for inflight subprocedures first. + ctx.wait_for_inflight_subprocedures(procedure_ctx).await?; + + let catalog = &ctx.persistent_ctx.catalog; + let schema = &ctx.persistent_ctx.schema; + let parallelism = ctx.persistent_ctx.parallelism; + if ctx.volatile_ctx.tables.as_deref().is_none() { + let tables = ctx + .table_metadata_manager + .table_name_manager() + .tables(catalog, schema); + ctx.volatile_ctx.tables = Some(tables); + } + + let pending_tables = &mut ctx.volatile_ctx.pending_tables; + // Safety: must exists. + while let Some((table_name, table_name_value)) = + ctx.volatile_ctx.tables.as_mut().unwrap().try_next().await? + { + let table_id = table_name_value.table_id(); + let Some(table_route) = ctx + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .get(table_id) + .await? + else { + continue; + }; + + let table_ref = TableReference::full(catalog, schema, &table_name); + // Enqueue table. + Self::enqueue_table(pending_tables, table_id, table_ref, table_route); + // Schedule reconcile table procedures if the number of pending procedures + // is greater than or equal to parallelism. + if Self::should_schedule_reconcile_tables(pending_tables, parallelism) { + return Self::schedule_reconcile_tables(ctx); + } + } + + // If there are remaining tables, schedule reconcile table procedures. + if !pending_tables.is_empty() { + return Self::schedule_reconcile_tables(ctx); + } + ctx.volatile_ctx.tables.take(); + Ok((Box::new(ReconcileLogicalTables), Status::executing(true))) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl ReconcileTables { + fn schedule_reconcile_tables( + ctx: &mut ReconcileDatabaseContext, + ) -> Result<(Box, Status)> { + let tables = std::mem::take(&mut ctx.volatile_ctx.pending_tables); + let subprocedures = Self::build_reconcile_table_procedures(ctx, tables); + ctx.volatile_ctx + .inflight_subprocedures + .extend(subprocedures.iter().map(|p| p.id)); + + Ok(( + Box::new(ReconcileTables), + Status::suspended(subprocedures, false), + )) + } + + fn should_schedule_reconcile_tables( + pending_tables: &[(TableId, TableName)], + parallelism: usize, + ) -> bool { + pending_tables.len() >= parallelism + } + + fn build_reconcile_table_procedures( + ctx: &ReconcileDatabaseContext, + tables: Vec<(TableId, TableName)>, + ) -> Vec { + let mut procedures = Vec::with_capacity(tables.len()); + for (table_id, table_name) in tables { + let context = Context { + node_manager: ctx.node_manager.clone(), + table_metadata_manager: ctx.table_metadata_manager.clone(), + cache_invalidator: ctx.cache_invalidator.clone(), + }; + let procedure = ReconcileTableProcedure::new( + context, + table_id, + table_name.clone(), + ctx.persistent_ctx.resolve_strategy, + true, + ); + let procedure = ProcedureWithId::with_random_id(Box::new(procedure)); + info!( + "Reconcile table: {}, table_id: {}, procedure_id: {}", + table_name, table_id, procedure.id + ); + procedures.push(procedure) + } + + procedures + } + + fn enqueue_table( + tables: &mut Vec<(TableId, TableName)>, + table_id: TableId, + table_ref: TableReference<'_>, + table_route: TableRouteValue, + ) { + if table_route.is_physical() { + tables.push((table_id, table_ref.into())); + } + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_database/start.rs b/src/common/meta/src/reconciliation/reconcile_database/start.rs new file mode 100644 index 0000000000..a1d6f38c05 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_database/start.rs @@ -0,0 +1,63 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; +use serde::{Deserialize, Serialize}; +use snafu::ensure; + +use crate::error::{self, Result}; +use crate::key::schema_name::SchemaNameKey; +use crate::reconciliation::reconcile_database::reconcile_tables::ReconcileTables; +use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State}; + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ReconcileDatabaseStart; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ReconcileDatabaseStart { + async fn next( + &mut self, + ctx: &mut ReconcileDatabaseContext, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + let exists = ctx + .table_metadata_manager + .schema_manager() + .exists(SchemaNameKey { + catalog: &ctx.persistent_ctx.catalog, + schema: &ctx.persistent_ctx.schema, + }) + .await?; + + ensure!( + exists, + error::SchemaNotFoundSnafu { + table_schema: &ctx.persistent_ctx.schema, + }, + ); + info!( + "Reconcile database: {}, catalog: {}", + ctx.persistent_ctx.schema, ctx.persistent_ctx.catalog + ); + Ok((Box::new(ReconcileTables), Status::executing(true))) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_database/utils.rs b/src/common/meta/src/reconciliation/reconcile_database/utils.rs new file mode 100644 index 0000000000..fccdaeec98 --- /dev/null +++ b/src/common/meta/src/reconciliation/reconcile_database/utils.rs @@ -0,0 +1,79 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_procedure::{watcher, Context as ProcedureContext, ProcedureId}; +use common_telemetry::{error, info, warn}; +use futures::future::{join_all, try_join_all}; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{ + ProcedureStateReceiverNotFoundSnafu, ProcedureStateReceiverSnafu, Result, WaitProcedureSnafu, +}; + +/// Wait for inflight subprocedures. +/// +/// If `fail_fast` is true, the function will return an error if any subprocedure fails. +/// Otherwise, the function will continue waiting for all subprocedures to complete. +pub(crate) async fn wait_for_inflight_subprocedures( + procedure_ctx: &ProcedureContext, + subprocedures: &[ProcedureId], + fail_fast: bool, +) -> Result<()> { + let mut receivers = Vec::with_capacity(subprocedures.len()); + for procedure_id in subprocedures { + let receiver = procedure_ctx + .provider + .procedure_state_receiver(*procedure_id) + .await + .context(ProcedureStateReceiverSnafu { + procedure_id: *procedure_id, + })? + .context(ProcedureStateReceiverNotFoundSnafu { + procedure_id: *procedure_id, + })?; + receivers.push(receiver); + } + + let mut tasks = Vec::with_capacity(receivers.len()); + for receiver in receivers.iter_mut() { + let fut = watcher::wait(receiver); + tasks.push(fut); + } + + if fail_fast { + try_join_all(tasks).await.context(WaitProcedureSnafu)?; + } else { + let mut failed = 0; + let total = tasks.len(); + for result in join_all(tasks).await { + if let Err(e) = result { + error!(e; "inflight subprocedure, procedure_id: {}", procedure_ctx.procedure_id); + failed += 1; + } + } + if failed > 0 { + warn!( + "{} inflight subprocedures failed, total: {}, procedure_id: {}", + failed, total, procedure_ctx.procedure_id + ); + } else { + info!( + "{} inflight subprocedures completed, procedure_id: {}", + total, procedure_ctx.procedure_id + ); + } + } + + Ok(()) +} diff --git a/src/common/meta/src/reconciliation/reconcile_table.rs b/src/common/meta/src/reconciliation/reconcile_table.rs index 2816355c00..afd1877ca3 100644 --- a/src/common/meta/src/reconciliation/reconcile_table.rs +++ b/src/common/meta/src/reconciliation/reconcile_table.rs @@ -85,6 +85,8 @@ pub(crate) struct PersistentContext { // The physical table route. // The value will be set in `ReconciliationStart` state. pub(crate) physical_table_route: Option, + // Whether the procedure is a subprocedure. + pub(crate) is_subprocedure: bool, } impl PersistentContext { @@ -92,6 +94,7 @@ impl PersistentContext { table_id: TableId, table_name: TableName, resolve_strategy: ResolveStrategy, + is_subprocedure: bool, ) -> Self { Self { table_id, @@ -99,6 +102,7 @@ impl PersistentContext { resolve_strategy, table_info_value: None, physical_table_route: None, + is_subprocedure, } } } @@ -143,8 +147,10 @@ impl ReconcileTableProcedure { table_id: TableId, table_name: TableName, resolve_strategy: ResolveStrategy, + is_subprocedure: bool, ) -> Self { - let persistent_ctx = PersistentContext::new(table_id, table_name, resolve_strategy); + let persistent_ctx = + PersistentContext::new(table_id, table_name, resolve_strategy, is_subprocedure); let context = ReconcileTableContext::new(ctx, persistent_ctx); let state = Box::new(ReconciliationStart); Self { context, state } @@ -211,6 +217,17 @@ impl Procedure for ReconcileTableProcedure { fn lock_key(&self) -> LockKey { let table_ref = &self.context.table_name().table_ref(); + if self.context.persistent_ctx.is_subprocedure { + // The catalog and schema are already locked by the parent procedure. + // Only lock the table name. + return LockKey::new(vec![TableNameLock::new( + table_ref.catalog, + table_ref.schema, + table_ref.table, + ) + .into()]); + } + LockKey::new(vec![ CatalogLock::Read(table_ref.catalog).into(), SchemaLock::read(table_ref.catalog, table_ref.schema).into(), diff --git a/src/common/procedure-test/Cargo.toml b/src/common/procedure-test/Cargo.toml index 07c8436646..e445a235bc 100644 --- a/src/common/procedure-test/Cargo.toml +++ b/src/common/procedure-test/Cargo.toml @@ -11,3 +11,4 @@ workspace = true async-trait.workspace = true common-procedure = { workspace = true, features = ["testing"] } snafu.workspace = true +tokio.workspace = true diff --git a/src/common/procedure-test/src/lib.rs b/src/common/procedure-test/src/lib.rs index 9e98a4972e..f7a3ecb710 100644 --- a/src/common/procedure-test/src/lib.rs +++ b/src/common/procedure-test/src/lib.rs @@ -26,6 +26,7 @@ use common_procedure::{ Context, ContextProvider, Output, PoisonKey, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Result, Status, StringKey, }; +use tokio::sync::watch::Receiver; /// A Mock [ContextProvider]. #[derive(Default)] @@ -57,6 +58,13 @@ impl ContextProvider for MockContextProvider { Ok(self.states.get(&procedure_id).cloned()) } + async fn procedure_state_receiver( + &self, + _procedure_id: ProcedureId, + ) -> Result>> { + Ok(None) + } + async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> { self.poison_manager .try_put_poison(key.to_string(), procedure_id.to_string()) diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index d9aaa8ea8a..9b80884436 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -247,6 +247,13 @@ impl ContextProvider for ManagerContext { Ok(self.state(procedure_id)) } + async fn procedure_state_receiver( + &self, + procedure_id: ProcedureId, + ) -> Result>> { + Ok(self.state_receiver(procedure_id)) + } + async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> { { // validate the procedure exists @@ -345,6 +352,14 @@ impl ManagerContext { procedures.get(&procedure_id).map(|meta| meta.state()) } + /// Returns the [Receiver] of specific `procedure_id`. + fn state_receiver(&self, procedure_id: ProcedureId) -> Option> { + let procedures = self.procedures.read().unwrap(); + procedures + .get(&procedure_id) + .map(|meta| meta.state_receiver.clone()) + } + /// Returns the [ProcedureMeta] of all procedures. fn list_procedure(&self) -> Vec { let procedures = self.procedures.read().unwrap(); diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 677fb33745..aacb61f6e1 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -601,6 +601,7 @@ mod tests { use futures_util::FutureExt; use object_store::{EntryMode, ObjectStore}; use tokio::sync::mpsc; + use tokio::sync::watch::Receiver; use super::*; use crate::local::{test_util, DynamicKeyLockGuard}; @@ -668,6 +669,13 @@ mod tests { unimplemented!() } + async fn procedure_state_receiver( + &self, + _procedure_id: ProcedureId, + ) -> Result>> { + unimplemented!() + } + async fn try_put_poison( &self, _key: &PoisonKey, diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index e208f754b0..0b5746d268 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; use snafu::{ResultExt, Snafu}; +use tokio::sync::watch::Receiver; use uuid::Uuid; use crate::error::{self, Error, Result}; @@ -58,6 +59,14 @@ pub enum Status { } impl Status { + /// Returns a [Status::Suspended] with given `subprocedures` and `persist` flag. + pub fn suspended(subprocedures: Vec, persist: bool) -> Status { + Status::Suspended { + subprocedures, + persist, + } + } + /// Returns a [Status::Poisoned] with given `keys` and `error`. pub fn poisoned(keys: impl IntoIterator, error: Error) -> Status { Status::Poisoned { @@ -140,6 +149,11 @@ pub trait ContextProvider: Send + Sync { /// Query the procedure state. async fn procedure_state(&self, procedure_id: ProcedureId) -> Result>; + async fn procedure_state_receiver( + &self, + procedure_id: ProcedureId, + ) -> Result>>; + /// Try to put a poison key for a procedure. /// /// This method is used to mark a resource as being operated on by a procedure.