From f44816cc156cd10165a7497175bcafc650a7404d Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 6 Aug 2025 19:24:03 +0800 Subject: [PATCH] feat: add metrics for reconciliation procedures (#6652) * feat: add metrics for reconciliation procedures Signed-off-by: WenyXu * refactor: improve error handling Signed-off-by: WenyXu * fix(datanode): handle ignore_nonexistent_region flag in open_all_regions Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * refactor: merge metrics Signed-off-by: WenyXu * chore: minor refactor Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/common/meta/src/error.rs | 38 +- src/common/meta/src/metrics.rs | 42 ++ .../src/reconciliation/reconcile_catalog.rs | 46 +- .../reconciliation/reconcile_catalog/end.rs | 12 +- .../reconcile_catalog/reconcile_databases.rs | 9 +- .../src/reconciliation/reconcile_database.rs | 54 +- .../reconciliation/reconcile_database/end.rs | 13 +- .../reconcile_logical_tables.rs | 33 +- .../reconcile_database/reconcile_tables.rs | 20 +- .../reconcile_database/start.rs | 6 +- .../reconcile_database/utils.rs | 79 --- .../reconcile_logical_tables.rs | 33 +- .../reconciliation_end.rs | 17 +- .../reconciliation_start.rs | 63 +- .../resolve_table_metadatas.rs | 40 +- .../update_table_infos.rs | 4 + .../src/reconciliation/reconcile_table.rs | 73 +- .../reconcile_table/reconciliation_end.rs | 14 +- .../reconcile_table/reconciliation_start.rs | 56 +- .../resolve_column_metadata.rs | 34 +- .../reconcile_table/update_table_info.rs | 3 + src/common/meta/src/reconciliation/utils.rs | 654 ++++++++++++++++-- src/datanode/src/datanode.rs | 56 +- 23 files changed, 1101 insertions(+), 298 deletions(-) delete mode 100644 src/common/meta/src/reconciliation/reconcile_database/utils.rs diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 4af0c28cb8..add64e877f 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -954,6 +954,39 @@ pub enum Error { table_name: String, table_id: TableId, }, + + #[snafu(display( + "Column not found in column metadata, column_name: {}, column_id: {}", + column_name, + column_id + ))] + ColumnNotFound { column_name: String, column_id: u32 }, + + #[snafu(display( + "Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}", + column_name, + expected_column_id, + actual_column_id + ))] + ColumnIdMismatch { + column_name: String, + expected_column_id: u32, + actual_column_id: u32, + }, + + #[snafu(display( + "Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}", + expected_column_name, + expected_column_id, + actual_column_name, + actual_column_id, + ))] + TimestampMismatch { + expected_column_name: String, + expected_column_id: u32, + actual_column_name: String, + actual_column_id: u32, + }, } pub type Result = std::result::Result; @@ -979,7 +1012,10 @@ impl ErrorExt for Error { | MissingColumnIds { .. } | MissingColumnInColumnMetadata { .. } | MismatchColumnId { .. } - | ColumnMetadataConflicts { .. } => StatusCode::Unexpected, + | ColumnMetadataConflicts { .. } + | ColumnNotFound { .. } + | ColumnIdMismatch { .. } + | TimestampMismatch { .. } => StatusCode::Unexpected, Unsupported { .. } => StatusCode::Unsupported, WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable, diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 2df82c8aba..a977c3ed1c 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -15,6 +15,13 @@ use lazy_static::lazy_static; use prometheus::*; +pub const TABLE_TYPE_PHYSICAL: &str = "physical"; +pub const TABLE_TYPE_LOGICAL: &str = "logical"; +pub const ERROR_TYPE_RETRYABLE: &str = "retryable"; +pub const ERROR_TYPE_EXTERNAL: &str = "external"; +pub const STATS_TYPE_NO_REGION_METADATA: &str = "no_region_metadata"; +pub const STATS_TYPE_REGION_NOT_OPEN: &str = "region_not_open"; + lazy_static! { pub static ref METRIC_META_TXN_REQUEST: HistogramVec = register_histogram_vec!( "greptime_meta_txn_request", @@ -114,4 +121,39 @@ lazy_static! { &["backend", "result", "op", "type"] ) .unwrap(); + pub static ref METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION: HistogramVec = + register_histogram_vec!( + "greptime_meta_reconciliation_list_region_metadata_duration", + "reconciliation list region metadata duration", + &["table_type"] + ) + .unwrap(); + pub static ref METRIC_META_RECONCILIATION_RESOLVED_COLUMN_METADATA: IntCounterVec = + register_int_counter_vec!( + "greptime_meta_reconciliation_resolved_column_metadata", + "reconciliation resolved column metadata", + &["strategy"] + ) + .unwrap(); + pub static ref METRIC_META_RECONCILIATION_STATS: IntCounterVec = + register_int_counter_vec!( + "greptime_meta_reconciliation_stats", + "reconciliation stats", + &["procedure_name", "table_type", "type"] + ) + .unwrap(); + pub static ref METRIC_META_RECONCILIATION_PROCEDURE: HistogramVec = + register_histogram_vec!( + "greptime_meta_reconciliation_procedure", + "reconcile table procedure", + &["procedure_name", "step"] + ) + .unwrap(); + pub static ref METRIC_META_RECONCILIATION_PROCEDURE_ERROR: IntCounterVec = + register_int_counter_vec!( + "greptime_meta_reconciliation_procedure_error", + "reconciliation procedure error", + &["procedure_name", "step", "error_type"] + ) + .unwrap(); } diff --git a/src/common/meta/src/reconciliation/reconcile_catalog.rs b/src/common/meta/src/reconciliation/reconcile_catalog.rs index 8a9fa8031a..341ffb3fdd 100644 --- a/src/common/meta/src/reconciliation/reconcile_catalog.rs +++ b/src/common/meta/src/reconciliation/reconcile_catalog.rs @@ -14,10 +14,11 @@ use std::any::Any; use std::fmt::Debug; +use std::time::Instant; use common_procedure::error::FromJsonSnafu; use common_procedure::{ - Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, ProcedureId, + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Result as ProcedureResult, Status, }; use futures::stream::BoxStream; @@ -28,11 +29,13 @@ use crate::cache_invalidator::CacheInvalidatorRef; use crate::error::Result; use crate::key::TableMetadataManagerRef; use crate::lock_key::CatalogLock; +use crate::metrics; use crate::node_manager::NodeManagerRef; use crate::reconciliation::reconcile_catalog::start::ReconcileCatalogStart; -use crate::reconciliation::reconcile_database::utils::wait_for_inflight_subprocedures; use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy; -use crate::reconciliation::utils::Context; +use crate::reconciliation::utils::{ + wait_for_inflight_subprocedures, Context, ReconcileCatalogMetrics, SubprocedureMeta, +}; pub(crate) mod end; pub(crate) mod reconcile_databases; @@ -61,13 +64,15 @@ impl ReconcileCatalogContext { &mut self, procedure_ctx: &ProcedureContext, ) -> Result<()> { - if let Some(procedure_id) = self.volatile_ctx.inflight_subprocedure { - wait_for_inflight_subprocedures( + if let Some(subprocedure) = self.volatile_ctx.inflight_subprocedure.take() { + let subprocedures = [subprocedure]; + let result = wait_for_inflight_subprocedures( procedure_ctx, - &[procedure_id], + &subprocedures, self.persistent_ctx.fast_fail, ) .await?; + self.volatile_ctx.metrics += result.into(); } Ok(()) } @@ -97,12 +102,26 @@ impl PersistentContext { } } -#[derive(Default)] pub(crate) struct VolatileContext { /// Stores the stream of catalogs. schemas: Option>>, /// Stores the inflight subprocedure. - inflight_subprocedure: Option, + inflight_subprocedure: Option, + /// Stores the metrics of reconciling catalog. + metrics: ReconcileCatalogMetrics, + /// The start time of the reconciliation. + start_time: Instant, +} + +impl Default for VolatileContext { + fn default() -> Self { + Self { + schemas: None, + inflight_subprocedure: None, + metrics: Default::default(), + start_time: Instant::now(), + } + } } pub struct ReconcileCatalogProcedure { @@ -158,6 +177,11 @@ impl Procedure for ReconcileCatalogProcedure { async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &mut self.state; + let procedure_name = Self::TYPE_NAME; + let step = state.name(); + let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE + .with_label_values(&[procedure_name, step]) + .start_timer(); match state.next(&mut self.context, _ctx).await { Ok((next, status)) => { *state = next; @@ -165,8 +189,14 @@ impl Procedure for ReconcileCatalogProcedure { } Err(e) => { if e.is_retry_later() { + metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR + .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE]) + .inc(); Err(ProcedureError::retry_later(e)) } else { + metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR + .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL]) + .inc(); Err(ProcedureError::external(e)) } } diff --git a/src/common/meta/src/reconciliation/reconcile_catalog/end.rs b/src/common/meta/src/reconciliation/reconcile_catalog/end.rs index e259bb38b3..964338c26e 100644 --- a/src/common/meta/src/reconciliation/reconcile_catalog/end.rs +++ b/src/common/meta/src/reconciliation/reconcile_catalog/end.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; use serde::{Deserialize, Serialize}; use crate::error::Result; @@ -28,9 +29,16 @@ pub(crate) struct ReconcileCatalogEnd; impl State for ReconcileCatalogEnd { async fn next( &mut self, - _ctx: &mut ReconcileCatalogContext, - _procedure_ctx: &ProcedureContext, + ctx: &mut ReconcileCatalogContext, + procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { + info!( + "Catalog reconciliation completed. catalog: {}, procedure_id: {}, metrics: {}, elapsed: {:?}", + ctx.persistent_ctx.catalog, + procedure_ctx.procedure_id, + ctx.volatile_ctx.metrics, + ctx.volatile_ctx.start_time.elapsed() + ); Ok((Box::new(ReconcileCatalogEnd), Status::done())) } diff --git a/src/common/meta/src/reconciliation/reconcile_catalog/reconcile_databases.rs b/src/common/meta/src/reconciliation/reconcile_catalog/reconcile_databases.rs index b51d45e2e6..9e30571029 100644 --- a/src/common/meta/src/reconciliation/reconcile_catalog/reconcile_databases.rs +++ b/src/common/meta/src/reconciliation/reconcile_catalog/reconcile_databases.rs @@ -23,7 +23,7 @@ use crate::error::Result; use crate::reconciliation::reconcile_catalog::end::ReconcileCatalogEnd; use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State}; use crate::reconciliation::reconcile_database::ReconcileDatabaseProcedure; -use crate::reconciliation::utils::Context; +use crate::reconciliation::utils::{Context, SubprocedureMeta}; #[derive(Debug, Serialize, Deserialize)] pub(crate) struct ReconcileDatabases; @@ -83,13 +83,18 @@ impl ReconcileDatabases { let procedure = ReconcileDatabaseProcedure::new( context, ctx.persistent_ctx.catalog.clone(), - schema, + schema.clone(), ctx.persistent_ctx.fast_fail, ctx.persistent_ctx.parallelism, ctx.persistent_ctx.resolve_strategy, true, ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + ctx.volatile_ctx.inflight_subprocedure = Some(SubprocedureMeta::new_reconcile_database( + procedure_with_id.id, + ctx.persistent_ctx.catalog.clone(), + schema, + )); Ok(( Box::new(ReconcileDatabases), diff --git a/src/common/meta/src/reconciliation/reconcile_database.rs b/src/common/meta/src/reconciliation/reconcile_database.rs index 195d915cd7..f4beffa973 100644 --- a/src/common/meta/src/reconciliation/reconcile_database.rs +++ b/src/common/meta/src/reconciliation/reconcile_database.rs @@ -16,16 +16,16 @@ pub(crate) mod end; pub(crate) mod reconcile_logical_tables; pub(crate) mod reconcile_tables; pub(crate) mod start; -pub(crate) mod utils; use std::any::Any; use std::collections::HashMap; use std::fmt::Debug; +use std::time::Instant; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ - Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, ProcedureId, + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Result as ProcedureResult, Status, }; use futures::stream::BoxStream; @@ -39,12 +39,13 @@ use crate::error::Result; use crate::key::table_name::TableNameValue; use crate::key::TableMetadataManagerRef; use crate::lock_key::{CatalogLock, SchemaLock}; +use crate::metrics; use crate::node_manager::NodeManagerRef; use crate::reconciliation::reconcile_database::start::ReconcileDatabaseStart; -use crate::reconciliation::reconcile_database::utils::wait_for_inflight_subprocedures; use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy; -use crate::reconciliation::utils::Context; - +use crate::reconciliation::utils::{ + wait_for_inflight_subprocedures, Context, ReconcileDatabaseMetrics, SubprocedureMeta, +}; pub(crate) const DEFAULT_PARALLELISM: usize = 64; pub(crate) struct ReconcileDatabaseContext { @@ -66,22 +67,32 @@ impl ReconcileDatabaseContext { } } + /// Waits for inflight subprocedures to complete. pub(crate) async fn wait_for_inflight_subprocedures( &mut self, procedure_ctx: &ProcedureContext, ) -> Result<()> { if !self.volatile_ctx.inflight_subprocedures.is_empty() { - wait_for_inflight_subprocedures( + let result = wait_for_inflight_subprocedures( procedure_ctx, &self.volatile_ctx.inflight_subprocedures, self.persistent_ctx.fail_fast, ) .await?; + + // Collects result into metrics + let metrics = result.into(); self.volatile_ctx.inflight_subprocedures.clear(); + self.volatile_ctx.metrics += metrics; } Ok(()) } + + /// Returns the immutable metrics. + pub(crate) fn metrics(&self) -> &ReconcileDatabaseMetrics { + &self.volatile_ctx.metrics + } } #[derive(Debug, Serialize, Deserialize)] @@ -114,7 +125,6 @@ impl PersistentContext { } } -#[derive(Default)] pub(crate) struct VolatileContext { /// Stores pending physical tables. pending_tables: Vec<(TableId, TableName)>, @@ -124,9 +134,26 @@ pub(crate) struct VolatileContext { /// - Value: Vector of (TableId, TableName) tuples representing logical tables belonging to the physical table. pending_logical_tables: HashMap>, /// Stores inflight subprocedures. - inflight_subprocedures: Vec, + inflight_subprocedures: Vec, /// Stores the stream of tables. tables: Option>>, + /// The metrics of reconciling database. + metrics: ReconcileDatabaseMetrics, + /// The start time of the reconciliation. + start_time: Instant, +} + +impl Default for VolatileContext { + fn default() -> Self { + Self { + pending_tables: vec![], + pending_logical_tables: HashMap::new(), + inflight_subprocedures: vec![], + tables: None, + metrics: ReconcileDatabaseMetrics::default(), + start_time: Instant::now(), + } + } } pub struct ReconcileDatabaseProcedure { @@ -190,6 +217,11 @@ impl Procedure for ReconcileDatabaseProcedure { async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &mut self.state; + let procedure_name = Self::TYPE_NAME; + let step = state.name(); + let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE + .with_label_values(&[procedure_name, step]) + .start_timer(); match state.next(&mut self.context, _ctx).await { Ok((next, status)) => { *state = next; @@ -197,8 +229,14 @@ impl Procedure for ReconcileDatabaseProcedure { } Err(e) => { if e.is_retry_later() { + metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR + .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE]) + .inc(); Err(ProcedureError::retry_later(e)) } else { + metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR + .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL]) + .inc(); Err(ProcedureError::external(e)) } } diff --git a/src/common/meta/src/reconciliation/reconcile_database/end.rs b/src/common/meta/src/reconciliation/reconcile_database/end.rs index 9d9bd75fc9..cbbd7bdbac 100644 --- a/src/common/meta/src/reconciliation/reconcile_database/end.rs +++ b/src/common/meta/src/reconciliation/reconcile_database/end.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; use serde::{Deserialize, Serialize}; use crate::error::Result; @@ -28,9 +29,17 @@ pub(crate) struct ReconcileDatabaseEnd; impl State for ReconcileDatabaseEnd { async fn next( &mut self, - _ctx: &mut ReconcileDatabaseContext, - _procedure_ctx: &ProcedureContext, + ctx: &mut ReconcileDatabaseContext, + procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { + info!( + "Database reconciliation completed. schema: {}, catalog: {}, procedure_id: {}, metrics: {}, elapsed: {:?}", + ctx.persistent_ctx.schema, + ctx.persistent_ctx.catalog, + procedure_ctx.procedure_id, + ctx.metrics(), + ctx.volatile_ctx.start_time.elapsed(), + ); Ok((Box::new(ReconcileDatabaseEnd), Status::done())) } diff --git a/src/common/meta/src/reconciliation/reconcile_database/reconcile_logical_tables.rs b/src/common/meta/src/reconciliation/reconcile_database/reconcile_logical_tables.rs index 985a8f64fa..2bf0457aeb 100644 --- a/src/common/meta/src/reconciliation/reconcile_database/reconcile_logical_tables.rs +++ b/src/common/meta/src/reconciliation/reconcile_database/reconcile_logical_tables.rs @@ -29,7 +29,7 @@ use crate::key::table_route::TableRouteValue; use crate::reconciliation::reconcile_database::end::ReconcileDatabaseEnd; use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State}; use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure; -use crate::reconciliation::utils::Context; +use crate::reconciliation::utils::{Context, SubprocedureMeta}; #[derive(Debug, Serialize, Deserialize)] pub(crate) struct ReconcileLogicalTables; @@ -128,13 +128,12 @@ impl State for ReconcileLogicalTables { impl ReconcileLogicalTables { fn schedule_reconcile_logical_tables( ctx: &mut ReconcileDatabaseContext, - buffer: &mut Vec, + buffer: &mut Vec<(ProcedureWithId, SubprocedureMeta)>, ) -> Result<(Box, Status)> { - let procedures = std::mem::take(buffer); - ctx.volatile_ctx - .inflight_subprocedures - .extend(procedures.iter().map(|p| p.id)); + let buffer = std::mem::take(buffer); + let (procedures, meta): (Vec<_>, Vec<_>) = buffer.into_iter().unzip(); + ctx.volatile_ctx.inflight_subprocedures.extend(meta); Ok(( Box::new(ReconcileLogicalTables), Status::suspended(procedures, false), @@ -142,7 +141,7 @@ impl ReconcileLogicalTables { } fn should_schedule_reconcile_logical_tables( - buffer: &[ProcedureWithId], + buffer: &[(ProcedureWithId, SubprocedureMeta)], parallelism: usize, ) -> bool { buffer.len() >= parallelism @@ -152,7 +151,7 @@ impl ReconcileLogicalTables { ctx: &Context, pending_logical_tables: &mut HashMap>, parallelism: usize, - ) -> Result> { + ) -> Result> { let mut physical_table_id = None; for (table_id, tables) in pending_logical_tables.iter() { if tables.len() >= parallelism { @@ -176,7 +175,7 @@ impl ReconcileLogicalTables { async fn build_remaining_procedures( ctx: &Context, pending_logical_tables: &mut HashMap>, - pending_procedures: &mut Vec, + pending_procedures: &mut Vec<(ProcedureWithId, SubprocedureMeta)>, parallelism: usize, ) -> Result<()> { if pending_logical_tables.is_empty() { @@ -203,7 +202,7 @@ impl ReconcileLogicalTables { ctx: &Context, physical_table_id: TableId, logical_tables: Vec<(TableId, TableName)>, - ) -> Result { + ) -> Result<(ProcedureWithId, SubprocedureMeta)> { let table_info = ctx .table_metadata_manager .table_info_manager() @@ -217,12 +216,18 @@ impl ReconcileLogicalTables { let procedure = ReconcileLogicalTablesProcedure::new( ctx.clone(), physical_table_id, - physical_table_name, - logical_tables, + physical_table_name.clone(), + logical_tables.clone(), true, ); - - Ok(ProcedureWithId::with_random_id(Box::new(procedure))) + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let subprocedure_meta = SubprocedureMeta::new_logical_table( + procedure_with_id.id, + physical_table_id, + physical_table_name, + logical_tables, + ); + Ok((procedure_with_id, subprocedure_meta)) } fn enqueue_logical_table( diff --git a/src/common/meta/src/reconciliation/reconcile_database/reconcile_tables.rs b/src/common/meta/src/reconciliation/reconcile_database/reconcile_tables.rs index fa26b5ecb5..93f24fcf2f 100644 --- a/src/common/meta/src/reconciliation/reconcile_database/reconcile_tables.rs +++ b/src/common/meta/src/reconciliation/reconcile_database/reconcile_tables.rs @@ -27,7 +27,7 @@ use crate::key::table_route::TableRouteValue; use crate::reconciliation::reconcile_database::reconcile_logical_tables::ReconcileLogicalTables; use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State}; use crate::reconciliation::reconcile_table::ReconcileTableProcedure; -use crate::reconciliation::utils::Context; +use crate::reconciliation::utils::{Context, SubprocedureMeta}; #[derive(Debug, Serialize, Deserialize)] pub(crate) struct ReconcileTables; @@ -104,14 +104,14 @@ impl ReconcileTables { ctx: &mut ReconcileDatabaseContext, ) -> Result<(Box, Status)> { let tables = std::mem::take(&mut ctx.volatile_ctx.pending_tables); - let subprocedures = Self::build_reconcile_table_procedures(ctx, tables); - ctx.volatile_ctx - .inflight_subprocedures - .extend(subprocedures.iter().map(|p| p.id)); - + let (procedures, meta): (Vec<_>, Vec<_>) = + Self::build_reconcile_table_procedures(ctx, tables) + .into_iter() + .unzip(); + ctx.volatile_ctx.inflight_subprocedures.extend(meta); Ok(( Box::new(ReconcileTables), - Status::suspended(subprocedures, false), + Status::suspended(procedures, false), )) } @@ -125,7 +125,7 @@ impl ReconcileTables { fn build_reconcile_table_procedures( ctx: &ReconcileDatabaseContext, tables: Vec<(TableId, TableName)>, - ) -> Vec { + ) -> Vec<(ProcedureWithId, SubprocedureMeta)> { let mut procedures = Vec::with_capacity(tables.len()); for (table_id, table_name) in tables { let context = Context { @@ -141,11 +141,13 @@ impl ReconcileTables { true, ); let procedure = ProcedureWithId::with_random_id(Box::new(procedure)); + let meta = + SubprocedureMeta::new_physical_table(procedure.id, table_id, table_name.clone()); info!( "Reconcile table: {}, table_id: {}, procedure_id: {}", table_name, table_id, procedure.id ); - procedures.push(procedure) + procedures.push((procedure, meta)); } procedures diff --git a/src/common/meta/src/reconciliation/reconcile_database/start.rs b/src/common/meta/src/reconciliation/reconcile_database/start.rs index a1d6f38c05..73fed9c0bb 100644 --- a/src/common/meta/src/reconciliation/reconcile_database/start.rs +++ b/src/common/meta/src/reconciliation/reconcile_database/start.rs @@ -33,7 +33,7 @@ impl State for ReconcileDatabaseStart { async fn next( &mut self, ctx: &mut ReconcileDatabaseContext, - _procedure_ctx: &ProcedureContext, + procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { let exists = ctx .table_metadata_manager @@ -51,8 +51,8 @@ impl State for ReconcileDatabaseStart { }, ); info!( - "Reconcile database: {}, catalog: {}", - ctx.persistent_ctx.schema, ctx.persistent_ctx.catalog + "Reconcile database: {}, catalog: {}, procedure_id: {}", + ctx.persistent_ctx.schema, ctx.persistent_ctx.catalog, procedure_ctx.procedure_id, ); Ok((Box::new(ReconcileTables), Status::executing(true))) } diff --git a/src/common/meta/src/reconciliation/reconcile_database/utils.rs b/src/common/meta/src/reconciliation/reconcile_database/utils.rs deleted file mode 100644 index fccdaeec98..0000000000 --- a/src/common/meta/src/reconciliation/reconcile_database/utils.rs +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_procedure::{watcher, Context as ProcedureContext, ProcedureId}; -use common_telemetry::{error, info, warn}; -use futures::future::{join_all, try_join_all}; -use snafu::{OptionExt, ResultExt}; - -use crate::error::{ - ProcedureStateReceiverNotFoundSnafu, ProcedureStateReceiverSnafu, Result, WaitProcedureSnafu, -}; - -/// Wait for inflight subprocedures. -/// -/// If `fail_fast` is true, the function will return an error if any subprocedure fails. -/// Otherwise, the function will continue waiting for all subprocedures to complete. -pub(crate) async fn wait_for_inflight_subprocedures( - procedure_ctx: &ProcedureContext, - subprocedures: &[ProcedureId], - fail_fast: bool, -) -> Result<()> { - let mut receivers = Vec::with_capacity(subprocedures.len()); - for procedure_id in subprocedures { - let receiver = procedure_ctx - .provider - .procedure_state_receiver(*procedure_id) - .await - .context(ProcedureStateReceiverSnafu { - procedure_id: *procedure_id, - })? - .context(ProcedureStateReceiverNotFoundSnafu { - procedure_id: *procedure_id, - })?; - receivers.push(receiver); - } - - let mut tasks = Vec::with_capacity(receivers.len()); - for receiver in receivers.iter_mut() { - let fut = watcher::wait(receiver); - tasks.push(fut); - } - - if fail_fast { - try_join_all(tasks).await.context(WaitProcedureSnafu)?; - } else { - let mut failed = 0; - let total = tasks.len(); - for result in join_all(tasks).await { - if let Err(e) = result { - error!(e; "inflight subprocedure, procedure_id: {}", procedure_ctx.procedure_id); - failed += 1; - } - } - if failed > 0 { - warn!( - "{} inflight subprocedures failed, total: {}, procedure_id: {}", - failed, total, procedure_ctx.procedure_id - ); - } else { - info!( - "{} inflight subprocedures completed, procedure_id: {}", - total, procedure_ctx.procedure_id - ); - } - } - - Ok(()) -} diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables.rs index e2520f8b1a..a067767c72 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables.rs @@ -40,15 +40,17 @@ use crate::key::table_info::TableInfoValue; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; +use crate::metrics; use crate::node_manager::NodeManagerRef; use crate::reconciliation::reconcile_logical_tables::reconciliation_start::ReconciliationStart; -use crate::reconciliation::utils::Context; +use crate::reconciliation::utils::{Context, ReconcileLogicalTableMetrics}; pub struct ReconcileLogicalTablesContext { pub node_manager: NodeManagerRef, pub table_metadata_manager: TableMetadataManagerRef, pub cache_invalidator: CacheInvalidatorRef, pub persistent_ctx: PersistentContext, + pub volatile_ctx: VolatileContext, } impl ReconcileLogicalTablesContext { @@ -59,16 +61,29 @@ impl ReconcileLogicalTablesContext { table_metadata_manager: ctx.table_metadata_manager, cache_invalidator: ctx.cache_invalidator, persistent_ctx, + volatile_ctx: VolatileContext::default(), } } + /// Returns the physical table name. pub(crate) fn table_name(&self) -> &TableName { &self.persistent_ctx.table_name } + /// Returns the physical table id. pub(crate) fn table_id(&self) -> TableId { self.persistent_ctx.table_id } + + /// Returns a mutable reference to the metrics. + pub(crate) fn mut_metrics(&mut self) -> &mut ReconcileLogicalTableMetrics { + &mut self.volatile_ctx.metrics + } + + /// Returns a reference to the metrics. + pub(crate) fn metrics(&self) -> &ReconcileLogicalTableMetrics { + &self.volatile_ctx.metrics + } } #[derive(Debug, Serialize, Deserialize)] @@ -120,6 +135,11 @@ impl PersistentContext { } } +#[derive(Default)] +pub(crate) struct VolatileContext { + pub(crate) metrics: ReconcileLogicalTableMetrics, +} + pub struct ReconcileLogicalTablesProcedure { pub context: ReconcileLogicalTablesContext, state: Box, @@ -173,6 +193,11 @@ impl Procedure for ReconcileLogicalTablesProcedure { async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &mut self.state; + let procedure_name = Self::TYPE_NAME; + let step = state.name(); + let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE + .with_label_values(&[procedure_name, step]) + .start_timer(); match state.next(&mut self.context, _ctx).await { Ok((next, status)) => { *state = next; @@ -180,8 +205,14 @@ impl Procedure for ReconcileLogicalTablesProcedure { } Err(e) => { if e.is_retry_later() { + metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR + .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE]) + .inc(); Err(ProcedureError::retry_later(e)) } else { + metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR + .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL]) + .inc(); Err(ProcedureError::external(e)) } } diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_end.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_end.rs index 09beaa702b..a3c074cbe6 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_end.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_end.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; use serde::{Deserialize, Serialize}; use crate::error::Result; @@ -28,9 +29,21 @@ pub struct ReconciliationEnd; impl State for ReconciliationEnd { async fn next( &mut self, - _ctx: &mut ReconcileLogicalTablesContext, - _procedure_ctx: &ProcedureContext, + ctx: &mut ReconcileLogicalTablesContext, + procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { + let table_id = ctx.table_id(); + let table_name = ctx.table_name(); + let metrics = ctx.metrics(); + + info!( + "Logical tables reconciliation completed. logical tables: {:?}, physical_table_id: {}, table_name: {}, procedure_id: {}, metrics: {}", + ctx.persistent_ctx.logical_table_ids, + table_id, + table_name, + procedure_ctx.procedure_id, + metrics + ); Ok((Box::new(ReconciliationEnd), Status::done())) } diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_start.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_start.rs index b74cec603a..1649abdc07 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_start.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconciliation_start.rs @@ -25,8 +25,11 @@ use crate::ddl::utils::region_metadata_lister::RegionMetadataLister; use crate::ddl::utils::table_id::get_all_table_ids_by_names; use crate::ddl::utils::table_info::all_logical_table_routes_have_same_physical_id; use crate::error::{self, Result}; +use crate::metrics; use crate::reconciliation::reconcile_logical_tables::resolve_table_metadatas::ResolveTableMetadatas; -use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State}; +use crate::reconciliation::reconcile_logical_tables::{ + ReconcileLogicalTablesContext, ReconcileLogicalTablesProcedure, State, +}; use crate::reconciliation::utils::check_column_metadatas_consistent; /// The start state of the reconciliation procedure. @@ -39,7 +42,7 @@ impl State for ReconciliationStart { async fn next( &mut self, ctx: &mut ReconcileLogicalTablesContext, - _procedure_ctx: &ProcedureContext, + procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { let table_id = ctx.table_id(); let table_name = ctx.table_name(); @@ -58,35 +61,48 @@ impl State for ReconciliationStart { } ); - info!( - "Starting reconciliation for logical table: table_id: {}, table_name: {}", - table_id, table_name - ); - let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone()); - let region_metadatas = region_metadata_lister - .list(physical_table_id, &physical_table_route.region_routes) - .await?; + let region_metadatas = { + let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION + .with_label_values(&[metrics::TABLE_TYPE_PHYSICAL]) + .start_timer(); + region_metadata_lister + .list(physical_table_id, &physical_table_route.region_routes) + .await? + }; + + ensure!(!region_metadatas.is_empty(), { + metrics::METRIC_META_RECONCILIATION_STATS + .with_label_values(&[ + ReconcileLogicalTablesProcedure::TYPE_NAME, + metrics::TABLE_TYPE_PHYSICAL, + metrics::STATS_TYPE_NO_REGION_METADATA, + ]) + .inc(); - ensure!( - !region_metadatas.is_empty(), error::UnexpectedSnafu { err_msg: format!( - "No region metadata found for table: {}, table_id: {}", + "No region metadata found for physical table: {}, table_id: {}", table_name, table_id ), } - ); + }); - if region_metadatas.iter().any(|r| r.is_none()) { - return error::UnexpectedSnafu { + ensure!(region_metadatas.iter().all(|r| r.is_some()), { + metrics::METRIC_META_RECONCILIATION_STATS + .with_label_values(&[ + ReconcileLogicalTablesProcedure::TYPE_NAME, + metrics::TABLE_TYPE_PHYSICAL, + metrics::STATS_TYPE_REGION_NOT_OPEN, + ]) + .inc(); + error::UnexpectedSnafu { err_msg: format!( - "Some regions of the physical table are not open. Table: {}, table_id: {}", + "Some regions of the physical table are not open. physical table: {}, table_id: {}", table_name, table_id ), } - .fail(); - } + }); // Safety: checked above let region_metadatas = region_metadatas @@ -96,14 +112,13 @@ impl State for ReconciliationStart { let _region_metadata = check_column_metadatas_consistent(®ion_metadatas).context( error::UnexpectedSnafu { err_msg: format!( - "Column metadatas are not consistent for table: {}, table_id: {}", + "Column metadatas are not consistent for physical table: {}, table_id: {}", table_name, table_id ), }, )?; // TODO(weny): ensure all columns in region metadata can be found in table info. - // Validates the logical tables. Self::validate_schema(&ctx.persistent_ctx.logical_tables)?; let table_refs = ctx @@ -119,6 +134,12 @@ impl State for ReconciliationStart { .await?; Self::validate_logical_table_routes(ctx, &table_ids).await?; + let table_name = ctx.table_name(); + info!( + "Starting reconciliation for logical tables: {:?}, physical_table_id: {}, table_name: {}, procedure_id: {}", + table_ids, table_id, table_name, procedure_ctx.procedure_id + ); + ctx.persistent_ctx.physical_table_route = Some(physical_table_route); ctx.persistent_ctx.logical_table_ids = table_ids; Ok((Box::new(ResolveTableMetadatas), Status::executing(true))) diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/resolve_table_metadatas.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/resolve_table_metadatas.rs index 3504337a90..6b08ff81a6 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables/resolve_table_metadatas.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/resolve_table_metadatas.rs @@ -22,8 +22,11 @@ use snafu::ensure; use crate::ddl::utils::region_metadata_lister::RegionMetadataLister; use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids; use crate::error::{self, Result}; +use crate::metrics; use crate::reconciliation::reconcile_logical_tables::reconcile_regions::ReconcileRegions; -use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State}; +use crate::reconciliation::reconcile_logical_tables::{ + ReconcileLogicalTablesContext, ReconcileLogicalTablesProcedure, State, +}; use crate::reconciliation::utils::{ check_column_metadatas_consistent, need_update_logical_table_info, }; @@ -65,22 +68,38 @@ impl State for ResolveTableMetadatas { .unwrap() .region_routes; let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone()); + let mut metadata_consistent_count = 0; + let mut metadata_inconsistent_count = 0; + let mut create_tables_count = 0; for (table_id, table_info_value) in table_ids.iter().zip(table_info_values.iter()) { - let region_metadatas = region_metadata_lister - .list(*table_id, region_routes) - .await?; + let region_metadatas = { + let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION + .with_label_values(&[metrics::TABLE_TYPE_LOGICAL]) + .start_timer(); + region_metadata_lister + .list(*table_id, region_routes) + .await? + }; + + ensure!(!region_metadatas.is_empty(), { + metrics::METRIC_META_RECONCILIATION_STATS + .with_label_values(&[ + ReconcileLogicalTablesProcedure::TYPE_NAME, + metrics::TABLE_TYPE_LOGICAL, + metrics::STATS_TYPE_NO_REGION_METADATA, + ]) + .inc(); - ensure!( - !region_metadatas.is_empty(), error::UnexpectedSnafu { err_msg: format!( "No region metadata found for table: {}, table_id: {}", table_info_value.table_info.name, table_id ), } - ); + }); if region_metadatas.iter().any(|r| r.is_none()) { + create_tables_count += 1; create_tables.push((*table_id, table_info_value.table_info.clone())); continue; } @@ -91,10 +110,12 @@ impl State for ResolveTableMetadatas { .map(|r| r.unwrap()) .collect::>(); if let Some(column_metadatas) = check_column_metadatas_consistent(®ion_metadatas) { + metadata_consistent_count += 1; if need_update_logical_table_info(&table_info_value.table_info, &column_metadatas) { update_table_infos.push((*table_id, column_metadatas)); } } else { + metadata_inconsistent_count += 1; // If the logical regions have inconsistent column metadatas, it won't affect read and write. // It's safe to continue if the column metadatas of the logical table are inconsistent. warn!( @@ -121,6 +142,11 @@ impl State for ResolveTableMetadatas { ); ctx.persistent_ctx.update_table_infos = update_table_infos; ctx.persistent_ctx.create_tables = create_tables; + // Update metrics. + let metrics = ctx.mut_metrics(); + metrics.column_metadata_consistent_count = metadata_consistent_count; + metrics.column_metadata_inconsistent_count = metadata_inconsistent_count; + metrics.create_tables_count = create_tables_count; Ok((Box::new(ReconcileRegions), Status::executing(true))) } diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/update_table_infos.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/update_table_infos.rs index fd7c276f8b..e82d210573 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables/update_table_infos.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/update_table_infos.rs @@ -96,6 +96,7 @@ impl State for UpdateTableInfos { let table_id = ctx.table_id(); let table_name = ctx.table_name(); + let updated_table_info_num = table_info_values_to_update.len(); batch_update_table_info_values(&ctx.table_metadata_manager, table_info_values_to_update) .await?; @@ -122,6 +123,9 @@ impl State for UpdateTableInfos { .await?; ctx.persistent_ctx.update_table_infos.clear(); + // Update metrics. + let metrics = ctx.mut_metrics(); + metrics.update_table_info_count = updated_table_info_num; Ok((Box::new(ReconciliationEnd), Status::executing(false))) } diff --git a/src/common/meta/src/reconciliation/reconcile_table.rs b/src/common/meta/src/reconciliation/reconcile_table.rs index afd1877ca3..c1ca21b971 100644 --- a/src/common/meta/src/reconciliation/reconcile_table.rs +++ b/src/common/meta/src/reconciliation/reconcile_table.rs @@ -40,10 +40,13 @@ use crate::key::table_info::TableInfoValue; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock}; +use crate::metrics; use crate::node_manager::NodeManagerRef; use crate::reconciliation::reconcile_table::reconciliation_start::ReconciliationStart; use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy; -use crate::reconciliation::utils::{build_table_meta_from_column_metadatas, Context}; +use crate::reconciliation::utils::{ + build_table_meta_from_column_metadatas, Context, ReconcileTableMetrics, +}; pub struct ReconcileTableContext { pub node_manager: NodeManagerRef, @@ -65,13 +68,46 @@ impl ReconcileTableContext { } } + /// Returns the physical table name. pub(crate) fn table_name(&self) -> &TableName { &self.persistent_ctx.table_name } + /// Returns the physical table id. pub(crate) fn table_id(&self) -> TableId { self.persistent_ctx.table_id } + + /// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s. + pub(crate) fn build_table_meta( + &self, + column_metadatas: &[ColumnMetadata], + ) -> Result { + // Safety: The table info value is set in `ReconciliationStart` state. + let table_info_value = self.persistent_ctx.table_info_value.as_ref().unwrap(); + let table_id = self.table_id(); + let table_ref = self.table_name().table_ref(); + let name_to_ids = table_info_value.table_info.name_to_ids(); + let table_meta = build_table_meta_from_column_metadatas( + table_id, + table_ref, + &table_info_value.table_info.meta, + name_to_ids, + column_metadatas, + )?; + + Ok(table_meta) + } + + /// Returns a mutable reference to the metrics. + pub(crate) fn mut_metrics(&mut self) -> &mut ReconcileTableMetrics { + &mut self.volatile_ctx.metrics + } + + /// Returns a reference to the metrics. + pub(crate) fn metrics(&self) -> &ReconcileTableMetrics { + &self.volatile_ctx.metrics + } } #[derive(Debug, Serialize, Deserialize)] @@ -110,29 +146,7 @@ impl PersistentContext { #[derive(Default)] pub(crate) struct VolatileContext { pub(crate) table_meta: Option, -} - -impl ReconcileTableContext { - /// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s. - pub(crate) fn build_table_meta( - &self, - column_metadatas: &[ColumnMetadata], - ) -> Result { - // Safety: The table info value is set in `ReconciliationStart` state. - let table_info_value = self.persistent_ctx.table_info_value.as_ref().unwrap(); - let table_id = self.table_id(); - let table_ref = self.table_name().table_ref(); - let name_to_ids = table_info_value.table_info.name_to_ids(); - let table_meta = build_table_meta_from_column_metadatas( - table_id, - table_ref, - &table_info_value.table_info.meta, - name_to_ids, - column_metadatas, - )?; - - Ok(table_meta) - } + pub(crate) metrics: ReconcileTableMetrics, } pub struct ReconcileTableProcedure { @@ -191,6 +205,11 @@ impl Procedure for ReconcileTableProcedure { async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &mut self.state; + let procedure_name = Self::TYPE_NAME; + let step = state.name(); + let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE + .with_label_values(&[procedure_name, step]) + .start_timer(); match state.next(&mut self.context, _ctx).await { Ok((next, status)) => { *state = next; @@ -198,8 +217,14 @@ impl Procedure for ReconcileTableProcedure { } Err(e) => { if e.is_retry_later() { + metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR + .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE]) + .inc(); Err(ProcedureError::retry_later(e)) } else { + metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR + .with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL]) + .inc(); Err(ProcedureError::external(e)) } } diff --git a/src/common/meta/src/reconciliation/reconcile_table/reconciliation_end.rs b/src/common/meta/src/reconciliation/reconcile_table/reconciliation_end.rs index 1fc11bed95..9f28d7f4be 100644 --- a/src/common/meta/src/reconciliation/reconcile_table/reconciliation_end.rs +++ b/src/common/meta/src/reconciliation/reconcile_table/reconciliation_end.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; use serde::{Deserialize, Serialize}; use tonic::async_trait; @@ -31,9 +32,18 @@ pub struct ReconciliationEnd; impl State for ReconciliationEnd { async fn next( &mut self, - _ctx: &mut ReconcileTableContext, - _procedure_ctx: &ProcedureContext, + ctx: &mut ReconcileTableContext, + procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { + let table_id = ctx.table_id(); + let table_name = ctx.table_name(); + let metrics = ctx.metrics(); + + info!( + "Physical table reconciliation completed. table_name: {}, table_id: {}, procedure_id: {}, metrics: {}", + table_name, table_id, procedure_ctx.procedure_id, metrics + ); + Ok((Box::new(ReconciliationEnd), Status::done())) } diff --git a/src/common/meta/src/reconciliation/reconcile_table/reconciliation_start.rs b/src/common/meta/src/reconciliation/reconcile_table/reconciliation_start.rs index 4ae4d1cbb4..2fcc8ded0c 100644 --- a/src/common/meta/src/reconciliation/reconcile_table/reconciliation_start.rs +++ b/src/common/meta/src/reconciliation/reconcile_table/reconciliation_start.rs @@ -20,9 +20,12 @@ use serde::{Deserialize, Serialize}; use snafu::ensure; use crate::ddl::utils::region_metadata_lister::RegionMetadataLister; -use crate::error::{self, Result, UnexpectedSnafu}; +use crate::error::{self, Result}; +use crate::metrics::{self}; use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveColumnMetadata; -use crate::reconciliation::reconcile_table::{ReconcileTableContext, State}; +use crate::reconciliation::reconcile_table::{ + ReconcileTableContext, ReconcileTableProcedure, State, +}; /// The start state of the reconciliation procedure. /// @@ -40,7 +43,7 @@ impl State for ReconciliationStart { async fn next( &mut self, ctx: &mut ReconcileTableContext, - _procedure_ctx: &ProcedureContext, + procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { let table_id = ctx.table_id(); let table_name = ctx.table_name(); @@ -60,33 +63,56 @@ impl State for ReconciliationStart { } ); - info!("Reconciling table: {}, table_id: {}", table_name, table_id); + info!( + "Reconciling table: {}, table_id: {}, procedure_id: {}", + table_name, table_id, procedure_ctx.procedure_id + ); // TODO(weny): Repairs the table route if needed. let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone()); - // Always list region metadatas for the physical table. - let region_metadatas = region_metadata_lister - .list(physical_table_id, &physical_table_route.region_routes) - .await?; - ensure!( - !region_metadatas.is_empty(), + let region_metadatas = { + let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION + .with_label_values(&[metrics::TABLE_TYPE_PHYSICAL]) + .start_timer(); + // Always list region metadatas for the physical table. + region_metadata_lister + .list(physical_table_id, &physical_table_route.region_routes) + .await? + }; + + ensure!(!region_metadatas.is_empty(), { + metrics::METRIC_META_RECONCILIATION_STATS + .with_label_values(&[ + ReconcileTableProcedure::TYPE_NAME, + metrics::TABLE_TYPE_PHYSICAL, + metrics::STATS_TYPE_NO_REGION_METADATA, + ]) + .inc(); + error::UnexpectedSnafu { err_msg: format!( "No region metadata found for table: {}, table_id: {}", table_name, table_id ), } - ); + }); - if region_metadatas.iter().any(|r| r.is_none()) { - return UnexpectedSnafu { + ensure!(region_metadatas.iter().all(|r| r.is_some()), { + metrics::METRIC_META_RECONCILIATION_STATS + .with_label_values(&[ + ReconcileTableProcedure::TYPE_NAME, + metrics::TABLE_TYPE_PHYSICAL, + metrics::STATS_TYPE_REGION_NOT_OPEN, + ]) + .inc(); + + error::UnexpectedSnafu { err_msg: format!( "Some regions are not opened, table: {}, table_id: {}", table_name, table_id ), } - .fail(); - } + }); // Persist the physical table route. // TODO(weny): refetch the physical table route if repair is needed. diff --git a/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs b/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs index b2dfa620f5..97fd7d473b 100644 --- a/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs +++ b/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs @@ -20,6 +20,7 @@ use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::OptionExt; use store_api::metadata::RegionMetadata; +use strum::AsRefStr; use crate::error::{self, MissingColumnIdsSnafu, Result}; use crate::reconciliation::reconcile_table::reconcile_regions::ReconcileRegions; @@ -28,10 +29,11 @@ use crate::reconciliation::reconcile_table::{ReconcileTableContext, State}; use crate::reconciliation::utils::{ build_column_metadata_from_table_info, check_column_metadatas_consistent, resolve_column_metadatas_with_latest, resolve_column_metadatas_with_metasrv, + ResolveColumnMetadataResult, }; /// Strategy for resolving column metadata inconsistencies. -#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default, AsRefStr)] pub enum ResolveStrategy { #[default] /// Trusts the latest column metadata from datanode. @@ -98,6 +100,10 @@ impl State for ResolveColumnMetadata { "Column metadatas are consistent for table: {}, table_id: {}.", table_name, table_id ); + + // Update metrics. + ctx.mut_metrics().resolve_column_metadata_result = + Some(ResolveColumnMetadataResult::Consistent); return Ok(( Box::new(UpdateTableInfo::new(table_info_value, column_metadatas)), Status::executing(false), @@ -119,6 +125,11 @@ impl State for ResolveColumnMetadata { let region_ids = resolve_column_metadatas_with_metasrv(&column_metadata, &self.region_metadata)?; + + // Update metrics. + let metrics = ctx.mut_metrics(); + metrics.resolve_column_metadata_result = + Some(ResolveColumnMetadataResult::Inconsistent(self.strategy)); Ok(( Box::new(ReconcileRegions::new(column_metadata, region_ids)), Status::executing(true), @@ -127,16 +138,29 @@ impl State for ResolveColumnMetadata { ResolveStrategy::UseLatest => { let (column_metadatas, region_ids) = resolve_column_metadatas_with_latest(&self.region_metadata)?; + + // Update metrics. + let metrics = ctx.mut_metrics(); + metrics.resolve_column_metadata_result = + Some(ResolveColumnMetadataResult::Inconsistent(self.strategy)); Ok(( Box::new(ReconcileRegions::new(column_metadatas, region_ids)), Status::executing(true), )) } - ResolveStrategy::AbortOnConflict => error::ColumnMetadataConflictsSnafu { - table_name: table_name.to_string(), - table_id, + ResolveStrategy::AbortOnConflict => { + let table_name = table_name.to_string(); + + // Update metrics. + let metrics = ctx.mut_metrics(); + metrics.resolve_column_metadata_result = + Some(ResolveColumnMetadataResult::Inconsistent(self.strategy)); + error::ColumnMetadataConflictsSnafu { + table_name, + table_id, + } + .fail() } - .fail(), } } diff --git a/src/common/meta/src/reconciliation/reconcile_table/update_table_info.rs b/src/common/meta/src/reconciliation/reconcile_table/update_table_info.rs index a6736daed0..16284a22ef 100644 --- a/src/common/meta/src/reconciliation/reconcile_table/update_table_info.rs +++ b/src/common/meta/src/reconciliation/reconcile_table/update_table_info.rs @@ -116,6 +116,9 @@ impl State for UpdateTableInfo { ], ) .await?; + // Update metrics. + let metrics = ctx.mut_metrics(); + metrics.update_table_info = true; Ok((Box::new(ReconciliationEnd), Status::executing(true))) } diff --git a/src/common/meta/src/reconciliation/utils.rs b/src/common/meta/src/reconciliation/utils.rs index 5610651913..42cbc6703c 100644 --- a/src/common/meta/src/reconciliation/utils.rs +++ b/src/common/meta/src/reconciliation/utils.rs @@ -13,23 +13,35 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; -use std::fmt; +use std::fmt::{self, Display}; +use std::ops::AddAssign; +use std::time::Instant; use api::v1::SemanticType; -use common_telemetry::warn; +use common_procedure::{watcher, Context as ProcedureContext, ProcedureId}; +use common_telemetry::{error, warn}; use datatypes::schema::ColumnSchema; -use snafu::{ensure, OptionExt}; +use futures::future::{join_all, try_join_all}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::storage::{RegionId, TableId}; use table::metadata::{RawTableInfo, RawTableMeta}; +use table::table_name::TableName; use table::table_reference::TableReference; use crate::cache_invalidator::CacheInvalidatorRef; use crate::error::{ - MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu, + ColumnIdMismatchSnafu, ColumnNotFoundSnafu, MismatchColumnIdSnafu, + MissingColumnInColumnMetadataSnafu, ProcedureStateReceiverNotFoundSnafu, + ProcedureStateReceiverSnafu, Result, TimestampMismatchSnafu, UnexpectedSnafu, + WaitProcedureSnafu, }; use crate::key::TableMetadataManagerRef; +use crate::metrics; use crate::node_manager::NodeManagerRef; +use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure; +use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy; +use crate::reconciliation::reconcile_table::ReconcileTableProcedure; #[derive(Debug, PartialEq, Eq)] pub(crate) struct PartialRegionMetadata<'a> { @@ -48,20 +60,6 @@ impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> { } } -/// A display wrapper for [`ColumnMetadata`] that formats the column metadata in a more readable way. -struct ColumnMetadataDisplay<'a>(pub &'a ColumnMetadata); - -impl<'a> fmt::Debug for ColumnMetadataDisplay<'a> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let col = self.0; - write!( - f, - "Column {{ name: {}, id: {}, semantic_type: {:?}, data_type: {:?} }}", - col.column_schema.name, col.column_id, col.semantic_type, col.column_schema.data_type, - ) - } -} - /// Checks if the column metadatas are consistent. /// /// The column metadatas are consistent if: @@ -110,21 +108,7 @@ pub(crate) fn resolve_column_metadatas_with_metasrv( let mut regions_ids = vec![]; for region_metadata in region_metadatas { if region_metadata.column_metadatas != column_metadatas { - let is_invariant_preserved = check_column_metadata_invariants( - column_metadatas, - ®ion_metadata.column_metadatas, - ); - ensure!( - is_invariant_preserved, - UnexpectedSnafu { - err_msg: format!( - "Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}", - region_metadata.region_id, - column_metadatas.iter().map(ColumnMetadataDisplay).collect::>(), - region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::>(), - ) - } - ); + check_column_metadata_invariants(column_metadatas, ®ion_metadata.column_metadatas)?; regions_ids.push(region_metadata.region_id); } } @@ -163,21 +147,10 @@ pub(crate) fn resolve_column_metadatas_with_latest( let mut region_ids = vec![]; for region_metadata in region_metadatas { if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas { - let is_invariant_preserved = check_column_metadata_invariants( + check_column_metadata_invariants( &latest_region_metadata.column_metadatas, ®ion_metadata.column_metadatas, - ); - ensure!( - is_invariant_preserved, - UnexpectedSnafu { - err_msg: format!( - "Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}", - region_metadata.region_id, - latest_column_metadatas.column_metadatas.iter().map(ColumnMetadataDisplay).collect::>(), - region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::>() - ) - } - ); + )?; region_ids.push(region_metadata.region_id); } } @@ -239,7 +212,7 @@ pub(crate) fn build_column_metadata_from_table_info( pub(crate) fn check_column_metadata_invariants( new_column_metadatas: &[ColumnMetadata], column_metadatas: &[ColumnMetadata], -) -> bool { +) -> Result<()> { let new_primary_keys = new_column_metadatas .iter() .filter(|c| c.semantic_type == SemanticType::Tag) @@ -252,22 +225,50 @@ pub(crate) fn check_column_metadata_invariants( .map(|c| (c.column_schema.name.as_str(), c.column_id)); for (name, id) in old_primary_keys { - if new_primary_keys.get(name) != Some(&id) { - return false; - } + let column_id = new_primary_keys + .get(name) + .cloned() + .context(ColumnNotFoundSnafu { + column_name: name, + column_id: id, + })?; + + ensure!( + column_id == id, + ColumnIdMismatchSnafu { + column_name: name, + expected_column_id: id, + actual_column_id: column_id, + } + ); } let new_ts_column = new_column_metadatas .iter() .find(|c| c.semantic_type == SemanticType::Timestamp) - .map(|c| (c.column_schema.name.as_str(), c.column_id)); + .map(|c| (c.column_schema.name.as_str(), c.column_id)) + .context(UnexpectedSnafu { + err_msg: "Timestamp column not found in new column metadata", + })?; let old_ts_column = column_metadatas .iter() .find(|c| c.semantic_type == SemanticType::Timestamp) - .map(|c| (c.column_schema.name.as_str(), c.column_id)); + .map(|c| (c.column_schema.name.as_str(), c.column_id)) + .context(UnexpectedSnafu { + err_msg: "Timestamp column not found in column metadata", + })?; + ensure!( + new_ts_column == old_ts_column, + TimestampMismatchSnafu { + expected_column_name: old_ts_column.0, + expected_column_id: old_ts_column.1, + actual_column_name: new_ts_column.0, + actual_column_id: new_ts_column.1, + } + ); - new_ts_column == old_ts_column + Ok(()) } /// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s. @@ -406,6 +407,88 @@ pub(crate) fn need_update_logical_table_info( table_info.meta.schema.column_schemas.len() != column_metadatas.len() } +/// The result of waiting for inflight subprocedures. +pub struct PartialSuccessResult<'a> { + pub failed_procedures: Vec<&'a SubprocedureMeta>, + pub success_procedures: Vec<&'a SubprocedureMeta>, +} + +/// The result of waiting for inflight subprocedures. +pub enum WaitForInflightSubproceduresResult<'a> { + Success(Vec<&'a SubprocedureMeta>), + PartialSuccess(PartialSuccessResult<'a>), +} + +/// Wait for inflight subprocedures. +/// +/// If `fail_fast` is true, the function will return an error if any subprocedure fails. +/// Otherwise, the function will continue waiting for all subprocedures to complete. +pub(crate) async fn wait_for_inflight_subprocedures<'a>( + procedure_ctx: &ProcedureContext, + subprocedures: &'a [SubprocedureMeta], + fail_fast: bool, +) -> Result> { + let mut receivers = Vec::with_capacity(subprocedures.len()); + for subprocedure in subprocedures { + let procedure_id = subprocedure.procedure_id(); + let receiver = procedure_ctx + .provider + .procedure_state_receiver(procedure_id) + .await + .context(ProcedureStateReceiverSnafu { procedure_id })? + .context(ProcedureStateReceiverNotFoundSnafu { procedure_id })?; + receivers.push((receiver, subprocedure)); + } + + let mut tasks = Vec::with_capacity(receivers.len()); + for (receiver, subprocedure) in receivers.iter_mut() { + tasks.push(async move { + watcher::wait(receiver).await.inspect_err(|e| { + error!(e; "inflight subprocedure failed, parent procedure_id: {}, procedure: {}", procedure_ctx.procedure_id, subprocedure); + }) + }); + } + + if fail_fast { + try_join_all(tasks).await.context(WaitProcedureSnafu)?; + return Ok(WaitForInflightSubproceduresResult::Success( + subprocedures.iter().collect(), + )); + } + + // If fail_fast is false, we need to wait for all subprocedures to complete. + let results = join_all(tasks).await; + let failed_procedures_num = results.iter().filter(|r| r.is_err()).count(); + if failed_procedures_num == 0 { + return Ok(WaitForInflightSubproceduresResult::Success( + subprocedures.iter().collect(), + )); + } + warn!( + "{} inflight subprocedures failed, total: {}, parent procedure_id: {}", + failed_procedures_num, + subprocedures.len(), + procedure_ctx.procedure_id + ); + + let mut failed_procedures = Vec::with_capacity(failed_procedures_num); + let mut success_procedures = Vec::with_capacity(subprocedures.len() - failed_procedures_num); + for (result, subprocedure) in results.into_iter().zip(subprocedures) { + if result.is_err() { + failed_procedures.push(subprocedure); + } else { + success_procedures.push(subprocedure); + } + } + + Ok(WaitForInflightSubproceduresResult::PartialSuccess( + PartialSuccessResult { + failed_procedures, + success_procedures, + }, + )) +} + #[derive(Clone)] pub struct Context { pub node_manager: NodeManagerRef, @@ -413,6 +496,446 @@ pub struct Context { pub cache_invalidator: CacheInvalidatorRef, } +/// Metadata for an inflight physical table subprocedure. +pub struct PhysicalTableMeta { + pub procedure_id: ProcedureId, + pub table_id: TableId, + pub table_name: TableName, +} + +/// Metadata for an inflight logical table subprocedure. +pub struct LogicalTableMeta { + pub procedure_id: ProcedureId, + pub physical_table_id: TableId, + pub physical_table_name: TableName, + pub logical_tables: Vec<(TableId, TableName)>, +} + +/// Metadata for an inflight database subprocedure. +pub struct ReconcileDatabaseMeta { + pub procedure_id: ProcedureId, + pub catalog: String, + pub schema: String, +} + +/// The inflight subprocedure metadata. +pub enum SubprocedureMeta { + PhysicalTable(PhysicalTableMeta), + LogicalTable(LogicalTableMeta), + Database(ReconcileDatabaseMeta), +} + +impl Display for SubprocedureMeta { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SubprocedureMeta::PhysicalTable(meta) => { + write!( + f, + "ReconcilePhysicalTable(procedure_id: {}, table_id: {}, table_name: {})", + meta.procedure_id, meta.table_id, meta.table_name + ) + } + SubprocedureMeta::LogicalTable(meta) => { + write!( + f, + "ReconcileLogicalTable(procedure_id: {}, physical_table_id: {}, physical_table_name: {}, logical_tables: {:?})", + meta.procedure_id, meta.physical_table_id, meta.physical_table_name, meta.logical_tables + ) + } + SubprocedureMeta::Database(meta) => { + write!( + f, + "ReconcileDatabase(procedure_id: {}, catalog: {}, schema: {})", + meta.procedure_id, meta.catalog, meta.schema + ) + } + } + } +} + +impl SubprocedureMeta { + /// Creates a new logical table subprocedure metadata. + pub fn new_logical_table( + procedure_id: ProcedureId, + physical_table_id: TableId, + physical_table_name: TableName, + logical_tables: Vec<(TableId, TableName)>, + ) -> Self { + Self::LogicalTable(LogicalTableMeta { + procedure_id, + physical_table_id, + physical_table_name, + logical_tables, + }) + } + + /// Creates a new physical table subprocedure metadata. + pub fn new_physical_table( + procedure_id: ProcedureId, + table_id: TableId, + table_name: TableName, + ) -> Self { + Self::PhysicalTable(PhysicalTableMeta { + procedure_id, + table_id, + table_name, + }) + } + + /// Creates a new reconcile database subprocedure metadata. + pub fn new_reconcile_database( + procedure_id: ProcedureId, + catalog: String, + schema: String, + ) -> Self { + Self::Database(ReconcileDatabaseMeta { + procedure_id, + catalog, + schema, + }) + } + + /// Returns the procedure id of the subprocedure. + pub fn procedure_id(&self) -> ProcedureId { + match self { + SubprocedureMeta::PhysicalTable(meta) => meta.procedure_id, + SubprocedureMeta::LogicalTable(meta) => meta.procedure_id, + SubprocedureMeta::Database(meta) => meta.procedure_id, + } + } + + /// Returns the number of tables will be reconciled. + pub fn table_num(&self) -> usize { + match self { + SubprocedureMeta::PhysicalTable(_) => 1, + SubprocedureMeta::LogicalTable(meta) => meta.logical_tables.len(), + SubprocedureMeta::Database(_) => 0, + } + } + + /// Returns the number of databases will be reconciled. + pub fn database_num(&self) -> usize { + match self { + SubprocedureMeta::Database(_) => 1, + _ => 0, + } + } +} + +/// The metrics of reconciling catalog. +#[derive(Clone, Default)] +pub struct ReconcileCatalogMetrics { + pub succeeded_databases: usize, + pub failed_databases: usize, +} + +impl AddAssign for ReconcileCatalogMetrics { + fn add_assign(&mut self, other: Self) { + self.succeeded_databases += other.succeeded_databases; + self.failed_databases += other.failed_databases; + } +} + +impl Display for ReconcileCatalogMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "succeeded_databases: {}, failed_databases: {}", + self.succeeded_databases, self.failed_databases + ) + } +} + +impl From> for ReconcileCatalogMetrics { + fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self { + match result { + WaitForInflightSubproceduresResult::Success(subprocedures) => ReconcileCatalogMetrics { + succeeded_databases: subprocedures.len(), + failed_databases: 0, + }, + WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult { + failed_procedures, + success_procedures, + }) => { + let succeeded_databases = success_procedures + .iter() + .map(|subprocedure| subprocedure.database_num()) + .sum(); + let failed_databases = failed_procedures + .iter() + .map(|subprocedure| subprocedure.database_num()) + .sum(); + ReconcileCatalogMetrics { + succeeded_databases, + failed_databases, + } + } + } + } +} + +/// The metrics of reconciling database. +#[derive(Clone, Default)] +pub struct ReconcileDatabaseMetrics { + pub succeeded_tables: usize, + pub failed_tables: usize, + pub succeeded_procedures: usize, + pub failed_procedures: usize, +} + +impl Display for ReconcileDatabaseMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "succeeded_tables: {}, failed_tables: {}, succeeded_procedures: {}, failed_procedures: {}", self.succeeded_tables, self.failed_tables, self.succeeded_procedures, self.failed_procedures) + } +} + +impl AddAssign for ReconcileDatabaseMetrics { + fn add_assign(&mut self, other: Self) { + self.succeeded_tables += other.succeeded_tables; + self.failed_tables += other.failed_tables; + self.succeeded_procedures += other.succeeded_procedures; + self.failed_procedures += other.failed_procedures; + } +} + +impl From> for ReconcileDatabaseMetrics { + fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self { + match result { + WaitForInflightSubproceduresResult::Success(subprocedures) => { + let table_num = subprocedures + .iter() + .map(|subprocedure| subprocedure.table_num()) + .sum(); + ReconcileDatabaseMetrics { + succeeded_procedures: subprocedures.len(), + failed_procedures: 0, + succeeded_tables: table_num, + failed_tables: 0, + } + } + WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult { + failed_procedures, + success_procedures, + }) => { + let succeeded_tables = success_procedures + .iter() + .map(|subprocedure| subprocedure.table_num()) + .sum(); + let failed_tables = failed_procedures + .iter() + .map(|subprocedure| subprocedure.table_num()) + .sum(); + ReconcileDatabaseMetrics { + succeeded_procedures: success_procedures.len(), + failed_procedures: failed_procedures.len(), + succeeded_tables, + failed_tables, + } + } + } + } +} + +/// The metrics of reconciling logical tables. +#[derive(Clone)] +pub struct ReconcileLogicalTableMetrics { + pub start_time: Instant, + pub update_table_info_count: usize, + pub create_tables_count: usize, + pub column_metadata_consistent_count: usize, + pub column_metadata_inconsistent_count: usize, +} + +impl Default for ReconcileLogicalTableMetrics { + fn default() -> Self { + Self { + start_time: Instant::now(), + update_table_info_count: 0, + create_tables_count: 0, + column_metadata_consistent_count: 0, + column_metadata_inconsistent_count: 0, + } + } +} + +const CREATE_TABLES: &str = "create_tables"; +const UPDATE_TABLE_INFO: &str = "update_table_info"; +const COLUMN_METADATA_CONSISTENT: &str = "column_metadata_consistent"; +const COLUMN_METADATA_INCONSISTENT: &str = "column_metadata_inconsistent"; + +impl ReconcileLogicalTableMetrics { + /// The total number of tables that have been reconciled. + pub fn total_table_count(&self) -> usize { + self.create_tables_count + + self.column_metadata_consistent_count + + self.column_metadata_inconsistent_count + } +} + +impl Drop for ReconcileLogicalTableMetrics { + fn drop(&mut self) { + let procedure_name = ReconcileLogicalTablesProcedure::TYPE_NAME; + metrics::METRIC_META_RECONCILIATION_STATS + .with_label_values(&[procedure_name, metrics::TABLE_TYPE_LOGICAL, CREATE_TABLES]) + .inc_by(self.create_tables_count as u64); + metrics::METRIC_META_RECONCILIATION_STATS + .with_label_values(&[ + procedure_name, + metrics::TABLE_TYPE_LOGICAL, + UPDATE_TABLE_INFO, + ]) + .inc_by(self.update_table_info_count as u64); + metrics::METRIC_META_RECONCILIATION_STATS + .with_label_values(&[ + procedure_name, + metrics::TABLE_TYPE_LOGICAL, + COLUMN_METADATA_CONSISTENT, + ]) + .inc_by(self.column_metadata_consistent_count as u64); + metrics::METRIC_META_RECONCILIATION_STATS + .with_label_values(&[ + procedure_name, + metrics::TABLE_TYPE_LOGICAL, + COLUMN_METADATA_INCONSISTENT, + ]) + .inc_by(self.column_metadata_inconsistent_count as u64); + } +} + +impl Display for ReconcileLogicalTableMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let elapsed = self.start_time.elapsed(); + if self.create_tables_count > 0 { + write!(f, "create_tables_count: {}, ", self.create_tables_count)?; + } + if self.update_table_info_count > 0 { + write!( + f, + "update_table_info_count: {}, ", + self.update_table_info_count + )?; + } + if self.column_metadata_consistent_count > 0 { + write!( + f, + "column_metadata_consistent_count: {}, ", + self.column_metadata_consistent_count + )?; + } + if self.column_metadata_inconsistent_count > 0 { + write!( + f, + "column_metadata_inconsistent_count: {}, ", + self.column_metadata_inconsistent_count + )?; + } + + write!( + f, + "total_table_count: {}, elapsed: {:?}", + self.total_table_count(), + elapsed + ) + } +} + +/// The result of resolving column metadata. +#[derive(Clone, Copy)] +pub enum ResolveColumnMetadataResult { + Consistent, + Inconsistent(ResolveStrategy), +} + +impl Display for ResolveColumnMetadataResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ResolveColumnMetadataResult::Consistent => write!(f, "Consistent"), + ResolveColumnMetadataResult::Inconsistent(strategy) => { + let strategy_str = strategy.as_ref(); + write!(f, "Inconsistent({})", strategy_str) + } + } + } +} + +/// The metrics of reconciling physical tables. +#[derive(Clone)] +pub struct ReconcileTableMetrics { + /// The start time of the reconciliation. + pub start_time: Instant, + /// The result of resolving column metadata. + pub resolve_column_metadata_result: Option, + /// Whether the table info has been updated. + pub update_table_info: bool, +} + +impl Drop for ReconcileTableMetrics { + fn drop(&mut self) { + if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result { + match resolve_column_metadata_result { + ResolveColumnMetadataResult::Consistent => { + metrics::METRIC_META_RECONCILIATION_STATS + .with_label_values(&[ + ReconcileTableProcedure::TYPE_NAME, + metrics::TABLE_TYPE_PHYSICAL, + COLUMN_METADATA_CONSISTENT, + ]) + .inc(); + } + ResolveColumnMetadataResult::Inconsistent(strategy) => { + metrics::METRIC_META_RECONCILIATION_STATS + .with_label_values(&[ + ReconcileTableProcedure::TYPE_NAME, + metrics::TABLE_TYPE_PHYSICAL, + COLUMN_METADATA_INCONSISTENT, + ]) + .inc(); + metrics::METRIC_META_RECONCILIATION_RESOLVED_COLUMN_METADATA + .with_label_values(&[strategy.as_ref()]) + .inc(); + } + } + } + if self.update_table_info { + metrics::METRIC_META_RECONCILIATION_STATS + .with_label_values(&[ + ReconcileTableProcedure::TYPE_NAME, + metrics::TABLE_TYPE_PHYSICAL, + UPDATE_TABLE_INFO, + ]) + .inc(); + } + } +} + +impl Default for ReconcileTableMetrics { + fn default() -> Self { + Self { + start_time: Instant::now(), + resolve_column_metadata_result: None, + update_table_info: false, + } + } +} + +impl Display for ReconcileTableMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let elapsed = self.start_time.elapsed(); + if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result { + write!( + f, + "resolve_column_metadata_result: {}, ", + resolve_column_metadata_result + )?; + } + write!( + f, + "update_table_info: {}, elapsed: {:?}", + self.update_table_info, elapsed + ) + } +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; @@ -665,10 +1188,7 @@ mod tests { semantic_type: SemanticType::Field, column_id: 3, }); - assert!(check_column_metadata_invariants( - &new_column_metadatas, - &column_metadatas - )); + check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap(); } #[test] @@ -676,18 +1196,12 @@ mod tests { let column_metadatas = new_test_column_metadatas(); let mut new_column_metadatas = column_metadatas.clone(); new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp); - assert!(!check_column_metadata_invariants( - &new_column_metadatas, - &column_metadatas - )); + check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err(); let column_metadatas = new_test_column_metadatas(); let mut new_column_metadatas = column_metadatas.clone(); new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag); - assert!(!check_column_metadata_invariants( - &new_column_metadatas, - &column_metadatas - )); + check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err(); } #[test] @@ -700,10 +1214,7 @@ mod tests { { col.column_id = 100; } - assert!(!check_column_metadata_invariants( - &new_column_metadatas, - &column_metadatas - )); + check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err(); let column_metadatas = new_test_column_metadatas(); let mut new_column_metadatas = column_metadatas.clone(); @@ -713,10 +1224,7 @@ mod tests { { col.column_id = 100; } - assert!(!check_column_metadata_invariants( - &new_column_metadatas, - &column_metadatas - )); + check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err(); } #[test] diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index cc05e038c0..66bd5c98b3 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -639,16 +639,24 @@ async fn open_all_regions( ignore_nonexistent_region, ) .await?; - ensure!( - open_regions.len() == num_regions, - error::UnexpectedSnafu { - violated: format!( - "Expected to open {} of regions, only {} of regions has opened", - num_regions, - open_regions.len() - ) - } - ); + if !ignore_nonexistent_region { + ensure!( + open_regions.len() == num_regions, + error::UnexpectedSnafu { + violated: format!( + "Expected to open {} of regions, only {} of regions has opened", + num_regions, + open_regions.len() + ) + } + ); + } else if open_regions.len() != num_regions { + warn!( + "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened", + num_regions, + open_regions.len() + ); + } for region_id in open_regions { if open_with_writable { @@ -688,16 +696,24 @@ async fn open_all_regions( ) .await?; - ensure!( - open_regions.len() == num_regions, - error::UnexpectedSnafu { - violated: format!( - "Expected to open {} of follower regions, only {} of regions has opened", - num_regions, - open_regions.len() - ) - } - ); + if !ignore_nonexistent_region { + ensure!( + open_regions.len() == num_regions, + error::UnexpectedSnafu { + violated: format!( + "Expected to open {} of follower regions, only {} of regions has opened", + num_regions, + open_regions.len() + ) + } + ); + } else if open_regions.len() != num_regions { + warn!( + "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened", + num_regions, + open_regions.len() + ); + } } info!("all regions are opened");