feat: add metrics for reconciliation procedures (#6652)

* feat: add metrics for reconciliation procedures

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: improve error handling

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix(datanode): handle ignore_nonexistent_region flag in open_all_regions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: merge metrics

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: minor refactor

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-06 19:24:03 +08:00
committed by WenyXu
parent c4c2b87615
commit f44816cc15
23 changed files with 1101 additions and 298 deletions

View File

@@ -954,6 +954,39 @@ pub enum Error {
table_name: String,
table_id: TableId,
},
#[snafu(display(
"Column not found in column metadata, column_name: {}, column_id: {}",
column_name,
column_id
))]
ColumnNotFound { column_name: String, column_id: u32 },
#[snafu(display(
"Column id mismatch, column_name: {}, expected column_id: {}, actual column_id: {}",
column_name,
expected_column_id,
actual_column_id
))]
ColumnIdMismatch {
column_name: String,
expected_column_id: u32,
actual_column_id: u32,
},
#[snafu(display(
"Timestamp column mismatch, expected column_name: {}, expected column_id: {}, actual column_name: {}, actual column_id: {}",
expected_column_name,
expected_column_id,
actual_column_name,
actual_column_id,
))]
TimestampMismatch {
expected_column_name: String,
expected_column_id: u32,
actual_column_name: String,
actual_column_id: u32,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -979,7 +1012,10 @@ impl ErrorExt for Error {
| MissingColumnIds { .. }
| MissingColumnInColumnMetadata { .. }
| MismatchColumnId { .. }
| ColumnMetadataConflicts { .. } => StatusCode::Unexpected,
| ColumnMetadataConflicts { .. }
| ColumnNotFound { .. }
| ColumnIdMismatch { .. }
| TimestampMismatch { .. } => StatusCode::Unexpected,
Unsupported { .. } => StatusCode::Unsupported,
WriteObject { .. } | ReadObject { .. } => StatusCode::StorageUnavailable,

View File

@@ -15,6 +15,13 @@
use lazy_static::lazy_static;
use prometheus::*;
pub const TABLE_TYPE_PHYSICAL: &str = "physical";
pub const TABLE_TYPE_LOGICAL: &str = "logical";
pub const ERROR_TYPE_RETRYABLE: &str = "retryable";
pub const ERROR_TYPE_EXTERNAL: &str = "external";
pub const STATS_TYPE_NO_REGION_METADATA: &str = "no_region_metadata";
pub const STATS_TYPE_REGION_NOT_OPEN: &str = "region_not_open";
lazy_static! {
pub static ref METRIC_META_TXN_REQUEST: HistogramVec = register_histogram_vec!(
"greptime_meta_txn_request",
@@ -114,4 +121,39 @@ lazy_static! {
&["backend", "result", "op", "type"]
)
.unwrap();
pub static ref METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION: HistogramVec =
register_histogram_vec!(
"greptime_meta_reconciliation_list_region_metadata_duration",
"reconciliation list region metadata duration",
&["table_type"]
)
.unwrap();
pub static ref METRIC_META_RECONCILIATION_RESOLVED_COLUMN_METADATA: IntCounterVec =
register_int_counter_vec!(
"greptime_meta_reconciliation_resolved_column_metadata",
"reconciliation resolved column metadata",
&["strategy"]
)
.unwrap();
pub static ref METRIC_META_RECONCILIATION_STATS: IntCounterVec =
register_int_counter_vec!(
"greptime_meta_reconciliation_stats",
"reconciliation stats",
&["procedure_name", "table_type", "type"]
)
.unwrap();
pub static ref METRIC_META_RECONCILIATION_PROCEDURE: HistogramVec =
register_histogram_vec!(
"greptime_meta_reconciliation_procedure",
"reconcile table procedure",
&["procedure_name", "step"]
)
.unwrap();
pub static ref METRIC_META_RECONCILIATION_PROCEDURE_ERROR: IntCounterVec =
register_int_counter_vec!(
"greptime_meta_reconciliation_procedure_error",
"reconciliation procedure error",
&["procedure_name", "step", "error_type"]
)
.unwrap();
}

View File

@@ -14,10 +14,11 @@
use std::any::Any;
use std::fmt::Debug;
use std::time::Instant;
use common_procedure::error::FromJsonSnafu;
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, ProcedureId,
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
};
use futures::stream::BoxStream;
@@ -28,11 +29,13 @@ use crate::cache_invalidator::CacheInvalidatorRef;
use crate::error::Result;
use crate::key::TableMetadataManagerRef;
use crate::lock_key::CatalogLock;
use crate::metrics;
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_catalog::start::ReconcileCatalogStart;
use crate::reconciliation::reconcile_database::utils::wait_for_inflight_subprocedures;
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::utils::Context;
use crate::reconciliation::utils::{
wait_for_inflight_subprocedures, Context, ReconcileCatalogMetrics, SubprocedureMeta,
};
pub(crate) mod end;
pub(crate) mod reconcile_databases;
@@ -61,13 +64,15 @@ impl ReconcileCatalogContext {
&mut self,
procedure_ctx: &ProcedureContext,
) -> Result<()> {
if let Some(procedure_id) = self.volatile_ctx.inflight_subprocedure {
wait_for_inflight_subprocedures(
if let Some(subprocedure) = self.volatile_ctx.inflight_subprocedure.take() {
let subprocedures = [subprocedure];
let result = wait_for_inflight_subprocedures(
procedure_ctx,
&[procedure_id],
&subprocedures,
self.persistent_ctx.fast_fail,
)
.await?;
self.volatile_ctx.metrics += result.into();
}
Ok(())
}
@@ -97,12 +102,26 @@ impl PersistentContext {
}
}
#[derive(Default)]
pub(crate) struct VolatileContext {
/// Stores the stream of catalogs.
schemas: Option<BoxStream<'static, Result<String>>>,
/// Stores the inflight subprocedure.
inflight_subprocedure: Option<ProcedureId>,
inflight_subprocedure: Option<SubprocedureMeta>,
/// Stores the metrics of reconciling catalog.
metrics: ReconcileCatalogMetrics,
/// The start time of the reconciliation.
start_time: Instant,
}
impl Default for VolatileContext {
fn default() -> Self {
Self {
schemas: None,
inflight_subprocedure: None,
metrics: Default::default(),
start_time: Instant::now(),
}
}
}
pub struct ReconcileCatalogProcedure {
@@ -158,6 +177,11 @@ impl Procedure for ReconcileCatalogProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let procedure_name = Self::TYPE_NAME;
let step = state.name();
let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
.with_label_values(&[procedure_name, step])
.start_timer();
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
@@ -165,8 +189,14 @@ impl Procedure for ReconcileCatalogProcedure {
}
Err(e) => {
if e.is_retry_later() {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
.inc();
Err(ProcedureError::retry_later(e))
} else {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
.inc();
Err(ProcedureError::external(e))
}
}

View File

@@ -15,6 +15,7 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use crate::error::Result;
@@ -28,9 +29,16 @@ pub(crate) struct ReconcileCatalogEnd;
impl State for ReconcileCatalogEnd {
async fn next(
&mut self,
_ctx: &mut ReconcileCatalogContext,
_procedure_ctx: &ProcedureContext,
ctx: &mut ReconcileCatalogContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
info!(
"Catalog reconciliation completed. catalog: {}, procedure_id: {}, metrics: {}, elapsed: {:?}",
ctx.persistent_ctx.catalog,
procedure_ctx.procedure_id,
ctx.volatile_ctx.metrics,
ctx.volatile_ctx.start_time.elapsed()
);
Ok((Box::new(ReconcileCatalogEnd), Status::done()))
}

View File

@@ -23,7 +23,7 @@ use crate::error::Result;
use crate::reconciliation::reconcile_catalog::end::ReconcileCatalogEnd;
use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State};
use crate::reconciliation::reconcile_database::ReconcileDatabaseProcedure;
use crate::reconciliation::utils::Context;
use crate::reconciliation::utils::{Context, SubprocedureMeta};
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileDatabases;
@@ -83,13 +83,18 @@ impl ReconcileDatabases {
let procedure = ReconcileDatabaseProcedure::new(
context,
ctx.persistent_ctx.catalog.clone(),
schema,
schema.clone(),
ctx.persistent_ctx.fast_fail,
ctx.persistent_ctx.parallelism,
ctx.persistent_ctx.resolve_strategy,
true,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
ctx.volatile_ctx.inflight_subprocedure = Some(SubprocedureMeta::new_reconcile_database(
procedure_with_id.id,
ctx.persistent_ctx.catalog.clone(),
schema,
));
Ok((
Box::new(ReconcileDatabases),

View File

@@ -16,16 +16,16 @@ pub(crate) mod end;
pub(crate) mod reconcile_logical_tables;
pub(crate) mod reconcile_tables;
pub(crate) mod start;
pub(crate) mod utils;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::time::Instant;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, ProcedureId,
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
};
use futures::stream::BoxStream;
@@ -39,12 +39,13 @@ use crate::error::Result;
use crate::key::table_name::TableNameValue;
use crate::key::TableMetadataManagerRef;
use crate::lock_key::{CatalogLock, SchemaLock};
use crate::metrics;
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_database::start::ReconcileDatabaseStart;
use crate::reconciliation::reconcile_database::utils::wait_for_inflight_subprocedures;
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::utils::Context;
use crate::reconciliation::utils::{
wait_for_inflight_subprocedures, Context, ReconcileDatabaseMetrics, SubprocedureMeta,
};
pub(crate) const DEFAULT_PARALLELISM: usize = 64;
pub(crate) struct ReconcileDatabaseContext {
@@ -66,22 +67,32 @@ impl ReconcileDatabaseContext {
}
}
/// Waits for inflight subprocedures to complete.
pub(crate) async fn wait_for_inflight_subprocedures(
&mut self,
procedure_ctx: &ProcedureContext,
) -> Result<()> {
if !self.volatile_ctx.inflight_subprocedures.is_empty() {
wait_for_inflight_subprocedures(
let result = wait_for_inflight_subprocedures(
procedure_ctx,
&self.volatile_ctx.inflight_subprocedures,
self.persistent_ctx.fail_fast,
)
.await?;
// Collects result into metrics
let metrics = result.into();
self.volatile_ctx.inflight_subprocedures.clear();
self.volatile_ctx.metrics += metrics;
}
Ok(())
}
/// Returns the immutable metrics.
pub(crate) fn metrics(&self) -> &ReconcileDatabaseMetrics {
&self.volatile_ctx.metrics
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -114,7 +125,6 @@ impl PersistentContext {
}
}
#[derive(Default)]
pub(crate) struct VolatileContext {
/// Stores pending physical tables.
pending_tables: Vec<(TableId, TableName)>,
@@ -124,9 +134,26 @@ pub(crate) struct VolatileContext {
/// - Value: Vector of (TableId, TableName) tuples representing logical tables belonging to the physical table.
pending_logical_tables: HashMap<TableId, Vec<(TableId, TableName)>>,
/// Stores inflight subprocedures.
inflight_subprocedures: Vec<ProcedureId>,
inflight_subprocedures: Vec<SubprocedureMeta>,
/// Stores the stream of tables.
tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
/// The metrics of reconciling database.
metrics: ReconcileDatabaseMetrics,
/// The start time of the reconciliation.
start_time: Instant,
}
impl Default for VolatileContext {
fn default() -> Self {
Self {
pending_tables: vec![],
pending_logical_tables: HashMap::new(),
inflight_subprocedures: vec![],
tables: None,
metrics: ReconcileDatabaseMetrics::default(),
start_time: Instant::now(),
}
}
}
pub struct ReconcileDatabaseProcedure {
@@ -190,6 +217,11 @@ impl Procedure for ReconcileDatabaseProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let procedure_name = Self::TYPE_NAME;
let step = state.name();
let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
.with_label_values(&[procedure_name, step])
.start_timer();
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
@@ -197,8 +229,14 @@ impl Procedure for ReconcileDatabaseProcedure {
}
Err(e) => {
if e.is_retry_later() {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
.inc();
Err(ProcedureError::retry_later(e))
} else {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
.inc();
Err(ProcedureError::external(e))
}
}

View File

@@ -15,6 +15,7 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use crate::error::Result;
@@ -28,9 +29,17 @@ pub(crate) struct ReconcileDatabaseEnd;
impl State for ReconcileDatabaseEnd {
async fn next(
&mut self,
_ctx: &mut ReconcileDatabaseContext,
_procedure_ctx: &ProcedureContext,
ctx: &mut ReconcileDatabaseContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
info!(
"Database reconciliation completed. schema: {}, catalog: {}, procedure_id: {}, metrics: {}, elapsed: {:?}",
ctx.persistent_ctx.schema,
ctx.persistent_ctx.catalog,
procedure_ctx.procedure_id,
ctx.metrics(),
ctx.volatile_ctx.start_time.elapsed(),
);
Ok((Box::new(ReconcileDatabaseEnd), Status::done()))
}

View File

@@ -29,7 +29,7 @@ use crate::key::table_route::TableRouteValue;
use crate::reconciliation::reconcile_database::end::ReconcileDatabaseEnd;
use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
use crate::reconciliation::utils::Context;
use crate::reconciliation::utils::{Context, SubprocedureMeta};
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileLogicalTables;
@@ -128,13 +128,12 @@ impl State for ReconcileLogicalTables {
impl ReconcileLogicalTables {
fn schedule_reconcile_logical_tables(
ctx: &mut ReconcileDatabaseContext,
buffer: &mut Vec<ProcedureWithId>,
buffer: &mut Vec<(ProcedureWithId, SubprocedureMeta)>,
) -> Result<(Box<dyn State>, Status)> {
let procedures = std::mem::take(buffer);
ctx.volatile_ctx
.inflight_subprocedures
.extend(procedures.iter().map(|p| p.id));
let buffer = std::mem::take(buffer);
let (procedures, meta): (Vec<_>, Vec<_>) = buffer.into_iter().unzip();
ctx.volatile_ctx.inflight_subprocedures.extend(meta);
Ok((
Box::new(ReconcileLogicalTables),
Status::suspended(procedures, false),
@@ -142,7 +141,7 @@ impl ReconcileLogicalTables {
}
fn should_schedule_reconcile_logical_tables(
buffer: &[ProcedureWithId],
buffer: &[(ProcedureWithId, SubprocedureMeta)],
parallelism: usize,
) -> bool {
buffer.len() >= parallelism
@@ -152,7 +151,7 @@ impl ReconcileLogicalTables {
ctx: &Context,
pending_logical_tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
parallelism: usize,
) -> Result<Option<ProcedureWithId>> {
) -> Result<Option<(ProcedureWithId, SubprocedureMeta)>> {
let mut physical_table_id = None;
for (table_id, tables) in pending_logical_tables.iter() {
if tables.len() >= parallelism {
@@ -176,7 +175,7 @@ impl ReconcileLogicalTables {
async fn build_remaining_procedures(
ctx: &Context,
pending_logical_tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
pending_procedures: &mut Vec<ProcedureWithId>,
pending_procedures: &mut Vec<(ProcedureWithId, SubprocedureMeta)>,
parallelism: usize,
) -> Result<()> {
if pending_logical_tables.is_empty() {
@@ -203,7 +202,7 @@ impl ReconcileLogicalTables {
ctx: &Context,
physical_table_id: TableId,
logical_tables: Vec<(TableId, TableName)>,
) -> Result<ProcedureWithId> {
) -> Result<(ProcedureWithId, SubprocedureMeta)> {
let table_info = ctx
.table_metadata_manager
.table_info_manager()
@@ -217,12 +216,18 @@ impl ReconcileLogicalTables {
let procedure = ReconcileLogicalTablesProcedure::new(
ctx.clone(),
physical_table_id,
physical_table_name,
logical_tables,
physical_table_name.clone(),
logical_tables.clone(),
true,
);
Ok(ProcedureWithId::with_random_id(Box::new(procedure)))
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let subprocedure_meta = SubprocedureMeta::new_logical_table(
procedure_with_id.id,
physical_table_id,
physical_table_name,
logical_tables,
);
Ok((procedure_with_id, subprocedure_meta))
}
fn enqueue_logical_table(

View File

@@ -27,7 +27,7 @@ use crate::key::table_route::TableRouteValue;
use crate::reconciliation::reconcile_database::reconcile_logical_tables::ReconcileLogicalTables;
use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
use crate::reconciliation::utils::Context;
use crate::reconciliation::utils::{Context, SubprocedureMeta};
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileTables;
@@ -104,14 +104,14 @@ impl ReconcileTables {
ctx: &mut ReconcileDatabaseContext,
) -> Result<(Box<dyn State>, Status)> {
let tables = std::mem::take(&mut ctx.volatile_ctx.pending_tables);
let subprocedures = Self::build_reconcile_table_procedures(ctx, tables);
ctx.volatile_ctx
.inflight_subprocedures
.extend(subprocedures.iter().map(|p| p.id));
let (procedures, meta): (Vec<_>, Vec<_>) =
Self::build_reconcile_table_procedures(ctx, tables)
.into_iter()
.unzip();
ctx.volatile_ctx.inflight_subprocedures.extend(meta);
Ok((
Box::new(ReconcileTables),
Status::suspended(subprocedures, false),
Status::suspended(procedures, false),
))
}
@@ -125,7 +125,7 @@ impl ReconcileTables {
fn build_reconcile_table_procedures(
ctx: &ReconcileDatabaseContext,
tables: Vec<(TableId, TableName)>,
) -> Vec<ProcedureWithId> {
) -> Vec<(ProcedureWithId, SubprocedureMeta)> {
let mut procedures = Vec::with_capacity(tables.len());
for (table_id, table_name) in tables {
let context = Context {
@@ -141,11 +141,13 @@ impl ReconcileTables {
true,
);
let procedure = ProcedureWithId::with_random_id(Box::new(procedure));
let meta =
SubprocedureMeta::new_physical_table(procedure.id, table_id, table_name.clone());
info!(
"Reconcile table: {}, table_id: {}, procedure_id: {}",
table_name, table_id, procedure.id
);
procedures.push(procedure)
procedures.push((procedure, meta));
}
procedures

View File

@@ -33,7 +33,7 @@ impl State for ReconcileDatabaseStart {
async fn next(
&mut self,
ctx: &mut ReconcileDatabaseContext,
_procedure_ctx: &ProcedureContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let exists = ctx
.table_metadata_manager
@@ -51,8 +51,8 @@ impl State for ReconcileDatabaseStart {
},
);
info!(
"Reconcile database: {}, catalog: {}",
ctx.persistent_ctx.schema, ctx.persistent_ctx.catalog
"Reconcile database: {}, catalog: {}, procedure_id: {}",
ctx.persistent_ctx.schema, ctx.persistent_ctx.catalog, procedure_ctx.procedure_id,
);
Ok((Box::new(ReconcileTables), Status::executing(true)))
}

View File

@@ -1,79 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_procedure::{watcher, Context as ProcedureContext, ProcedureId};
use common_telemetry::{error, info, warn};
use futures::future::{join_all, try_join_all};
use snafu::{OptionExt, ResultExt};
use crate::error::{
ProcedureStateReceiverNotFoundSnafu, ProcedureStateReceiverSnafu, Result, WaitProcedureSnafu,
};
/// Wait for inflight subprocedures.
///
/// If `fail_fast` is true, the function will return an error if any subprocedure fails.
/// Otherwise, the function will continue waiting for all subprocedures to complete.
pub(crate) async fn wait_for_inflight_subprocedures(
procedure_ctx: &ProcedureContext,
subprocedures: &[ProcedureId],
fail_fast: bool,
) -> Result<()> {
let mut receivers = Vec::with_capacity(subprocedures.len());
for procedure_id in subprocedures {
let receiver = procedure_ctx
.provider
.procedure_state_receiver(*procedure_id)
.await
.context(ProcedureStateReceiverSnafu {
procedure_id: *procedure_id,
})?
.context(ProcedureStateReceiverNotFoundSnafu {
procedure_id: *procedure_id,
})?;
receivers.push(receiver);
}
let mut tasks = Vec::with_capacity(receivers.len());
for receiver in receivers.iter_mut() {
let fut = watcher::wait(receiver);
tasks.push(fut);
}
if fail_fast {
try_join_all(tasks).await.context(WaitProcedureSnafu)?;
} else {
let mut failed = 0;
let total = tasks.len();
for result in join_all(tasks).await {
if let Err(e) = result {
error!(e; "inflight subprocedure, procedure_id: {}", procedure_ctx.procedure_id);
failed += 1;
}
}
if failed > 0 {
warn!(
"{} inflight subprocedures failed, total: {}, procedure_id: {}",
failed, total, procedure_ctx.procedure_id
);
} else {
info!(
"{} inflight subprocedures completed, procedure_id: {}",
total, procedure_ctx.procedure_id
);
}
}
Ok(())
}

View File

@@ -40,15 +40,17 @@ use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::metrics;
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_logical_tables::reconciliation_start::ReconciliationStart;
use crate::reconciliation::utils::Context;
use crate::reconciliation::utils::{Context, ReconcileLogicalTableMetrics};
pub struct ReconcileLogicalTablesContext {
pub node_manager: NodeManagerRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
pub persistent_ctx: PersistentContext,
pub volatile_ctx: VolatileContext,
}
impl ReconcileLogicalTablesContext {
@@ -59,16 +61,29 @@ impl ReconcileLogicalTablesContext {
table_metadata_manager: ctx.table_metadata_manager,
cache_invalidator: ctx.cache_invalidator,
persistent_ctx,
volatile_ctx: VolatileContext::default(),
}
}
/// Returns the physical table name.
pub(crate) fn table_name(&self) -> &TableName {
&self.persistent_ctx.table_name
}
/// Returns the physical table id.
pub(crate) fn table_id(&self) -> TableId {
self.persistent_ctx.table_id
}
/// Returns a mutable reference to the metrics.
pub(crate) fn mut_metrics(&mut self) -> &mut ReconcileLogicalTableMetrics {
&mut self.volatile_ctx.metrics
}
/// Returns a reference to the metrics.
pub(crate) fn metrics(&self) -> &ReconcileLogicalTableMetrics {
&self.volatile_ctx.metrics
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -120,6 +135,11 @@ impl PersistentContext {
}
}
#[derive(Default)]
pub(crate) struct VolatileContext {
pub(crate) metrics: ReconcileLogicalTableMetrics,
}
pub struct ReconcileLogicalTablesProcedure {
pub context: ReconcileLogicalTablesContext,
state: Box<dyn State>,
@@ -173,6 +193,11 @@ impl Procedure for ReconcileLogicalTablesProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let procedure_name = Self::TYPE_NAME;
let step = state.name();
let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
.with_label_values(&[procedure_name, step])
.start_timer();
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
@@ -180,8 +205,14 @@ impl Procedure for ReconcileLogicalTablesProcedure {
}
Err(e) => {
if e.is_retry_later() {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
.inc();
Err(ProcedureError::retry_later(e))
} else {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
.inc();
Err(ProcedureError::external(e))
}
}

View File

@@ -15,6 +15,7 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use crate::error::Result;
@@ -28,9 +29,21 @@ pub struct ReconciliationEnd;
impl State for ReconciliationEnd {
async fn next(
&mut self,
_ctx: &mut ReconcileLogicalTablesContext,
_procedure_ctx: &ProcedureContext,
ctx: &mut ReconcileLogicalTablesContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.table_id();
let table_name = ctx.table_name();
let metrics = ctx.metrics();
info!(
"Logical tables reconciliation completed. logical tables: {:?}, physical_table_id: {}, table_name: {}, procedure_id: {}, metrics: {}",
ctx.persistent_ctx.logical_table_ids,
table_id,
table_name,
procedure_ctx.procedure_id,
metrics
);
Ok((Box::new(ReconciliationEnd), Status::done()))
}

View File

@@ -25,8 +25,11 @@ use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
use crate::ddl::utils::table_id::get_all_table_ids_by_names;
use crate::ddl::utils::table_info::all_logical_table_routes_have_same_physical_id;
use crate::error::{self, Result};
use crate::metrics;
use crate::reconciliation::reconcile_logical_tables::resolve_table_metadatas::ResolveTableMetadatas;
use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
use crate::reconciliation::reconcile_logical_tables::{
ReconcileLogicalTablesContext, ReconcileLogicalTablesProcedure, State,
};
use crate::reconciliation::utils::check_column_metadatas_consistent;
/// The start state of the reconciliation procedure.
@@ -39,7 +42,7 @@ impl State for ReconciliationStart {
async fn next(
&mut self,
ctx: &mut ReconcileLogicalTablesContext,
_procedure_ctx: &ProcedureContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.table_id();
let table_name = ctx.table_name();
@@ -58,35 +61,48 @@ impl State for ReconciliationStart {
}
);
info!(
"Starting reconciliation for logical table: table_id: {}, table_name: {}",
table_id, table_name
);
let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
let region_metadatas = region_metadata_lister
.list(physical_table_id, &physical_table_route.region_routes)
.await?;
let region_metadatas = {
let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION
.with_label_values(&[metrics::TABLE_TYPE_PHYSICAL])
.start_timer();
region_metadata_lister
.list(physical_table_id, &physical_table_route.region_routes)
.await?
};
ensure!(!region_metadatas.is_empty(), {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileLogicalTablesProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
metrics::STATS_TYPE_NO_REGION_METADATA,
])
.inc();
ensure!(
!region_metadatas.is_empty(),
error::UnexpectedSnafu {
err_msg: format!(
"No region metadata found for table: {}, table_id: {}",
"No region metadata found for physical table: {}, table_id: {}",
table_name, table_id
),
}
);
});
if region_metadatas.iter().any(|r| r.is_none()) {
return error::UnexpectedSnafu {
ensure!(region_metadatas.iter().all(|r| r.is_some()), {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileLogicalTablesProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
metrics::STATS_TYPE_REGION_NOT_OPEN,
])
.inc();
error::UnexpectedSnafu {
err_msg: format!(
"Some regions of the physical table are not open. Table: {}, table_id: {}",
"Some regions of the physical table are not open. physical table: {}, table_id: {}",
table_name, table_id
),
}
.fail();
}
});
// Safety: checked above
let region_metadatas = region_metadatas
@@ -96,14 +112,13 @@ impl State for ReconciliationStart {
let _region_metadata = check_column_metadatas_consistent(&region_metadatas).context(
error::UnexpectedSnafu {
err_msg: format!(
"Column metadatas are not consistent for table: {}, table_id: {}",
"Column metadatas are not consistent for physical table: {}, table_id: {}",
table_name, table_id
),
},
)?;
// TODO(weny): ensure all columns in region metadata can be found in table info.
// Validates the logical tables.
Self::validate_schema(&ctx.persistent_ctx.logical_tables)?;
let table_refs = ctx
@@ -119,6 +134,12 @@ impl State for ReconciliationStart {
.await?;
Self::validate_logical_table_routes(ctx, &table_ids).await?;
let table_name = ctx.table_name();
info!(
"Starting reconciliation for logical tables: {:?}, physical_table_id: {}, table_name: {}, procedure_id: {}",
table_ids, table_id, table_name, procedure_ctx.procedure_id
);
ctx.persistent_ctx.physical_table_route = Some(physical_table_route);
ctx.persistent_ctx.logical_table_ids = table_ids;
Ok((Box::new(ResolveTableMetadatas), Status::executing(true)))

View File

@@ -22,8 +22,11 @@ use snafu::ensure;
use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids;
use crate::error::{self, Result};
use crate::metrics;
use crate::reconciliation::reconcile_logical_tables::reconcile_regions::ReconcileRegions;
use crate::reconciliation::reconcile_logical_tables::{ReconcileLogicalTablesContext, State};
use crate::reconciliation::reconcile_logical_tables::{
ReconcileLogicalTablesContext, ReconcileLogicalTablesProcedure, State,
};
use crate::reconciliation::utils::{
check_column_metadatas_consistent, need_update_logical_table_info,
};
@@ -65,22 +68,38 @@ impl State for ResolveTableMetadatas {
.unwrap()
.region_routes;
let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
let mut metadata_consistent_count = 0;
let mut metadata_inconsistent_count = 0;
let mut create_tables_count = 0;
for (table_id, table_info_value) in table_ids.iter().zip(table_info_values.iter()) {
let region_metadatas = region_metadata_lister
.list(*table_id, region_routes)
.await?;
let region_metadatas = {
let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION
.with_label_values(&[metrics::TABLE_TYPE_LOGICAL])
.start_timer();
region_metadata_lister
.list(*table_id, region_routes)
.await?
};
ensure!(!region_metadatas.is_empty(), {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileLogicalTablesProcedure::TYPE_NAME,
metrics::TABLE_TYPE_LOGICAL,
metrics::STATS_TYPE_NO_REGION_METADATA,
])
.inc();
ensure!(
!region_metadatas.is_empty(),
error::UnexpectedSnafu {
err_msg: format!(
"No region metadata found for table: {}, table_id: {}",
table_info_value.table_info.name, table_id
),
}
);
});
if region_metadatas.iter().any(|r| r.is_none()) {
create_tables_count += 1;
create_tables.push((*table_id, table_info_value.table_info.clone()));
continue;
}
@@ -91,10 +110,12 @@ impl State for ResolveTableMetadatas {
.map(|r| r.unwrap())
.collect::<Vec<_>>();
if let Some(column_metadatas) = check_column_metadatas_consistent(&region_metadatas) {
metadata_consistent_count += 1;
if need_update_logical_table_info(&table_info_value.table_info, &column_metadatas) {
update_table_infos.push((*table_id, column_metadatas));
}
} else {
metadata_inconsistent_count += 1;
// If the logical regions have inconsistent column metadatas, it won't affect read and write.
// It's safe to continue if the column metadatas of the logical table are inconsistent.
warn!(
@@ -121,6 +142,11 @@ impl State for ResolveTableMetadatas {
);
ctx.persistent_ctx.update_table_infos = update_table_infos;
ctx.persistent_ctx.create_tables = create_tables;
// Update metrics.
let metrics = ctx.mut_metrics();
metrics.column_metadata_consistent_count = metadata_consistent_count;
metrics.column_metadata_inconsistent_count = metadata_inconsistent_count;
metrics.create_tables_count = create_tables_count;
Ok((Box::new(ReconcileRegions), Status::executing(true)))
}

View File

@@ -96,6 +96,7 @@ impl State for UpdateTableInfos {
let table_id = ctx.table_id();
let table_name = ctx.table_name();
let updated_table_info_num = table_info_values_to_update.len();
batch_update_table_info_values(&ctx.table_metadata_manager, table_info_values_to_update)
.await?;
@@ -122,6 +123,9 @@ impl State for UpdateTableInfos {
.await?;
ctx.persistent_ctx.update_table_infos.clear();
// Update metrics.
let metrics = ctx.mut_metrics();
metrics.update_table_info_count = updated_table_info_num;
Ok((Box::new(ReconciliationEnd), Status::executing(false)))
}

View File

@@ -40,10 +40,13 @@ use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::metrics;
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_table::reconciliation_start::ReconciliationStart;
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::utils::{build_table_meta_from_column_metadatas, Context};
use crate::reconciliation::utils::{
build_table_meta_from_column_metadatas, Context, ReconcileTableMetrics,
};
pub struct ReconcileTableContext {
pub node_manager: NodeManagerRef,
@@ -65,13 +68,46 @@ impl ReconcileTableContext {
}
}
/// Returns the physical table name.
pub(crate) fn table_name(&self) -> &TableName {
&self.persistent_ctx.table_name
}
/// Returns the physical table id.
pub(crate) fn table_id(&self) -> TableId {
self.persistent_ctx.table_id
}
/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
pub(crate) fn build_table_meta(
&self,
column_metadatas: &[ColumnMetadata],
) -> Result<RawTableMeta> {
// Safety: The table info value is set in `ReconciliationStart` state.
let table_info_value = self.persistent_ctx.table_info_value.as_ref().unwrap();
let table_id = self.table_id();
let table_ref = self.table_name().table_ref();
let name_to_ids = table_info_value.table_info.name_to_ids();
let table_meta = build_table_meta_from_column_metadatas(
table_id,
table_ref,
&table_info_value.table_info.meta,
name_to_ids,
column_metadatas,
)?;
Ok(table_meta)
}
/// Returns a mutable reference to the metrics.
pub(crate) fn mut_metrics(&mut self) -> &mut ReconcileTableMetrics {
&mut self.volatile_ctx.metrics
}
/// Returns a reference to the metrics.
pub(crate) fn metrics(&self) -> &ReconcileTableMetrics {
&self.volatile_ctx.metrics
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -110,29 +146,7 @@ impl PersistentContext {
#[derive(Default)]
pub(crate) struct VolatileContext {
pub(crate) table_meta: Option<RawTableMeta>,
}
impl ReconcileTableContext {
/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
pub(crate) fn build_table_meta(
&self,
column_metadatas: &[ColumnMetadata],
) -> Result<RawTableMeta> {
// Safety: The table info value is set in `ReconciliationStart` state.
let table_info_value = self.persistent_ctx.table_info_value.as_ref().unwrap();
let table_id = self.table_id();
let table_ref = self.table_name().table_ref();
let name_to_ids = table_info_value.table_info.name_to_ids();
let table_meta = build_table_meta_from_column_metadatas(
table_id,
table_ref,
&table_info_value.table_info.meta,
name_to_ids,
column_metadatas,
)?;
Ok(table_meta)
}
pub(crate) metrics: ReconcileTableMetrics,
}
pub struct ReconcileTableProcedure {
@@ -191,6 +205,11 @@ impl Procedure for ReconcileTableProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let procedure_name = Self::TYPE_NAME;
let step = state.name();
let _timer = metrics::METRIC_META_RECONCILIATION_PROCEDURE
.with_label_values(&[procedure_name, step])
.start_timer();
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
@@ -198,8 +217,14 @@ impl Procedure for ReconcileTableProcedure {
}
Err(e) => {
if e.is_retry_later() {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_RETRYABLE])
.inc();
Err(ProcedureError::retry_later(e))
} else {
metrics::METRIC_META_RECONCILIATION_PROCEDURE_ERROR
.with_label_values(&[procedure_name, step, metrics::ERROR_TYPE_EXTERNAL])
.inc();
Err(ProcedureError::external(e))
}
}

View File

@@ -15,6 +15,7 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use tonic::async_trait;
@@ -31,9 +32,18 @@ pub struct ReconciliationEnd;
impl State for ReconciliationEnd {
async fn next(
&mut self,
_ctx: &mut ReconcileTableContext,
_procedure_ctx: &ProcedureContext,
ctx: &mut ReconcileTableContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.table_id();
let table_name = ctx.table_name();
let metrics = ctx.metrics();
info!(
"Physical table reconciliation completed. table_name: {}, table_id: {}, procedure_id: {}, metrics: {}",
table_name, table_id, procedure_ctx.procedure_id, metrics
);
Ok((Box::new(ReconciliationEnd), Status::done()))
}

View File

@@ -20,9 +20,12 @@ use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::ddl::utils::region_metadata_lister::RegionMetadataLister;
use crate::error::{self, Result, UnexpectedSnafu};
use crate::error::{self, Result};
use crate::metrics::{self};
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveColumnMetadata;
use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
use crate::reconciliation::reconcile_table::{
ReconcileTableContext, ReconcileTableProcedure, State,
};
/// The start state of the reconciliation procedure.
///
@@ -40,7 +43,7 @@ impl State for ReconciliationStart {
async fn next(
&mut self,
ctx: &mut ReconcileTableContext,
_procedure_ctx: &ProcedureContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = ctx.table_id();
let table_name = ctx.table_name();
@@ -60,33 +63,56 @@ impl State for ReconciliationStart {
}
);
info!("Reconciling table: {}, table_id: {}", table_name, table_id);
info!(
"Reconciling table: {}, table_id: {}, procedure_id: {}",
table_name, table_id, procedure_ctx.procedure_id
);
// TODO(weny): Repairs the table route if needed.
let region_metadata_lister = RegionMetadataLister::new(ctx.node_manager.clone());
// Always list region metadatas for the physical table.
let region_metadatas = region_metadata_lister
.list(physical_table_id, &physical_table_route.region_routes)
.await?;
ensure!(
!region_metadatas.is_empty(),
let region_metadatas = {
let _timer = metrics::METRIC_META_RECONCILIATION_LIST_REGION_METADATA_DURATION
.with_label_values(&[metrics::TABLE_TYPE_PHYSICAL])
.start_timer();
// Always list region metadatas for the physical table.
region_metadata_lister
.list(physical_table_id, &physical_table_route.region_routes)
.await?
};
ensure!(!region_metadatas.is_empty(), {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileTableProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
metrics::STATS_TYPE_NO_REGION_METADATA,
])
.inc();
error::UnexpectedSnafu {
err_msg: format!(
"No region metadata found for table: {}, table_id: {}",
table_name, table_id
),
}
);
});
if region_metadatas.iter().any(|r| r.is_none()) {
return UnexpectedSnafu {
ensure!(region_metadatas.iter().all(|r| r.is_some()), {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileTableProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
metrics::STATS_TYPE_REGION_NOT_OPEN,
])
.inc();
error::UnexpectedSnafu {
err_msg: format!(
"Some regions are not opened, table: {}, table_id: {}",
table_name, table_id
),
}
.fail();
}
});
// Persist the physical table route.
// TODO(weny): refetch the physical table route if repair is needed.

View File

@@ -20,6 +20,7 @@ use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use store_api::metadata::RegionMetadata;
use strum::AsRefStr;
use crate::error::{self, MissingColumnIdsSnafu, Result};
use crate::reconciliation::reconcile_table::reconcile_regions::ReconcileRegions;
@@ -28,10 +29,11 @@ use crate::reconciliation::reconcile_table::{ReconcileTableContext, State};
use crate::reconciliation::utils::{
build_column_metadata_from_table_info, check_column_metadatas_consistent,
resolve_column_metadatas_with_latest, resolve_column_metadatas_with_metasrv,
ResolveColumnMetadataResult,
};
/// Strategy for resolving column metadata inconsistencies.
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default, AsRefStr)]
pub enum ResolveStrategy {
#[default]
/// Trusts the latest column metadata from datanode.
@@ -98,6 +100,10 @@ impl State for ResolveColumnMetadata {
"Column metadatas are consistent for table: {}, table_id: {}.",
table_name, table_id
);
// Update metrics.
ctx.mut_metrics().resolve_column_metadata_result =
Some(ResolveColumnMetadataResult::Consistent);
return Ok((
Box::new(UpdateTableInfo::new(table_info_value, column_metadatas)),
Status::executing(false),
@@ -119,6 +125,11 @@ impl State for ResolveColumnMetadata {
let region_ids =
resolve_column_metadatas_with_metasrv(&column_metadata, &self.region_metadata)?;
// Update metrics.
let metrics = ctx.mut_metrics();
metrics.resolve_column_metadata_result =
Some(ResolveColumnMetadataResult::Inconsistent(self.strategy));
Ok((
Box::new(ReconcileRegions::new(column_metadata, region_ids)),
Status::executing(true),
@@ -127,16 +138,29 @@ impl State for ResolveColumnMetadata {
ResolveStrategy::UseLatest => {
let (column_metadatas, region_ids) =
resolve_column_metadatas_with_latest(&self.region_metadata)?;
// Update metrics.
let metrics = ctx.mut_metrics();
metrics.resolve_column_metadata_result =
Some(ResolveColumnMetadataResult::Inconsistent(self.strategy));
Ok((
Box::new(ReconcileRegions::new(column_metadatas, region_ids)),
Status::executing(true),
))
}
ResolveStrategy::AbortOnConflict => error::ColumnMetadataConflictsSnafu {
table_name: table_name.to_string(),
table_id,
ResolveStrategy::AbortOnConflict => {
let table_name = table_name.to_string();
// Update metrics.
let metrics = ctx.mut_metrics();
metrics.resolve_column_metadata_result =
Some(ResolveColumnMetadataResult::Inconsistent(self.strategy));
error::ColumnMetadataConflictsSnafu {
table_name,
table_id,
}
.fail()
}
.fail(),
}
}

View File

@@ -116,6 +116,9 @@ impl State for UpdateTableInfo {
],
)
.await?;
// Update metrics.
let metrics = ctx.mut_metrics();
metrics.update_table_info = true;
Ok((Box::new(ReconciliationEnd), Status::executing(true)))
}

View File

@@ -13,23 +13,35 @@
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::fmt::{self, Display};
use std::ops::AddAssign;
use std::time::Instant;
use api::v1::SemanticType;
use common_telemetry::warn;
use common_procedure::{watcher, Context as ProcedureContext, ProcedureId};
use common_telemetry::{error, warn};
use datatypes::schema::ColumnSchema;
use snafu::{ensure, OptionExt};
use futures::future::{join_all, try_join_all};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::{RegionId, TableId};
use table::metadata::{RawTableInfo, RawTableMeta};
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::error::{
MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu,
ColumnIdMismatchSnafu, ColumnNotFoundSnafu, MismatchColumnIdSnafu,
MissingColumnInColumnMetadataSnafu, ProcedureStateReceiverNotFoundSnafu,
ProcedureStateReceiverSnafu, Result, TimestampMismatchSnafu, UnexpectedSnafu,
WaitProcedureSnafu,
};
use crate::key::TableMetadataManagerRef;
use crate::metrics;
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct PartialRegionMetadata<'a> {
@@ -48,20 +60,6 @@ impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> {
}
}
/// A display wrapper for [`ColumnMetadata`] that formats the column metadata in a more readable way.
struct ColumnMetadataDisplay<'a>(pub &'a ColumnMetadata);
impl<'a> fmt::Debug for ColumnMetadataDisplay<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let col = self.0;
write!(
f,
"Column {{ name: {}, id: {}, semantic_type: {:?}, data_type: {:?} }}",
col.column_schema.name, col.column_id, col.semantic_type, col.column_schema.data_type,
)
}
}
/// Checks if the column metadatas are consistent.
///
/// The column metadatas are consistent if:
@@ -110,21 +108,7 @@ pub(crate) fn resolve_column_metadatas_with_metasrv(
let mut regions_ids = vec![];
for region_metadata in region_metadatas {
if region_metadata.column_metadatas != column_metadatas {
let is_invariant_preserved = check_column_metadata_invariants(
column_metadatas,
&region_metadata.column_metadatas,
);
ensure!(
is_invariant_preserved,
UnexpectedSnafu {
err_msg: format!(
"Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}",
region_metadata.region_id,
column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
)
}
);
check_column_metadata_invariants(column_metadatas, &region_metadata.column_metadatas)?;
regions_ids.push(region_metadata.region_id);
}
}
@@ -163,21 +147,10 @@ pub(crate) fn resolve_column_metadatas_with_latest(
let mut region_ids = vec![];
for region_metadata in region_metadatas {
if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas {
let is_invariant_preserved = check_column_metadata_invariants(
check_column_metadata_invariants(
&latest_region_metadata.column_metadatas,
&region_metadata.column_metadatas,
);
ensure!(
is_invariant_preserved,
UnexpectedSnafu {
err_msg: format!(
"Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}",
region_metadata.region_id,
latest_column_metadatas.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>()
)
}
);
)?;
region_ids.push(region_metadata.region_id);
}
}
@@ -239,7 +212,7 @@ pub(crate) fn build_column_metadata_from_table_info(
pub(crate) fn check_column_metadata_invariants(
new_column_metadatas: &[ColumnMetadata],
column_metadatas: &[ColumnMetadata],
) -> bool {
) -> Result<()> {
let new_primary_keys = new_column_metadatas
.iter()
.filter(|c| c.semantic_type == SemanticType::Tag)
@@ -252,22 +225,50 @@ pub(crate) fn check_column_metadata_invariants(
.map(|c| (c.column_schema.name.as_str(), c.column_id));
for (name, id) in old_primary_keys {
if new_primary_keys.get(name) != Some(&id) {
return false;
}
let column_id = new_primary_keys
.get(name)
.cloned()
.context(ColumnNotFoundSnafu {
column_name: name,
column_id: id,
})?;
ensure!(
column_id == id,
ColumnIdMismatchSnafu {
column_name: name,
expected_column_id: id,
actual_column_id: column_id,
}
);
}
let new_ts_column = new_column_metadatas
.iter()
.find(|c| c.semantic_type == SemanticType::Timestamp)
.map(|c| (c.column_schema.name.as_str(), c.column_id));
.map(|c| (c.column_schema.name.as_str(), c.column_id))
.context(UnexpectedSnafu {
err_msg: "Timestamp column not found in new column metadata",
})?;
let old_ts_column = column_metadatas
.iter()
.find(|c| c.semantic_type == SemanticType::Timestamp)
.map(|c| (c.column_schema.name.as_str(), c.column_id));
.map(|c| (c.column_schema.name.as_str(), c.column_id))
.context(UnexpectedSnafu {
err_msg: "Timestamp column not found in column metadata",
})?;
ensure!(
new_ts_column == old_ts_column,
TimestampMismatchSnafu {
expected_column_name: old_ts_column.0,
expected_column_id: old_ts_column.1,
actual_column_name: new_ts_column.0,
actual_column_id: new_ts_column.1,
}
);
new_ts_column == old_ts_column
Ok(())
}
/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
@@ -406,6 +407,88 @@ pub(crate) fn need_update_logical_table_info(
table_info.meta.schema.column_schemas.len() != column_metadatas.len()
}
/// The result of waiting for inflight subprocedures.
pub struct PartialSuccessResult<'a> {
pub failed_procedures: Vec<&'a SubprocedureMeta>,
pub success_procedures: Vec<&'a SubprocedureMeta>,
}
/// The result of waiting for inflight subprocedures.
pub enum WaitForInflightSubproceduresResult<'a> {
Success(Vec<&'a SubprocedureMeta>),
PartialSuccess(PartialSuccessResult<'a>),
}
/// Wait for inflight subprocedures.
///
/// If `fail_fast` is true, the function will return an error if any subprocedure fails.
/// Otherwise, the function will continue waiting for all subprocedures to complete.
pub(crate) async fn wait_for_inflight_subprocedures<'a>(
procedure_ctx: &ProcedureContext,
subprocedures: &'a [SubprocedureMeta],
fail_fast: bool,
) -> Result<WaitForInflightSubproceduresResult<'a>> {
let mut receivers = Vec::with_capacity(subprocedures.len());
for subprocedure in subprocedures {
let procedure_id = subprocedure.procedure_id();
let receiver = procedure_ctx
.provider
.procedure_state_receiver(procedure_id)
.await
.context(ProcedureStateReceiverSnafu { procedure_id })?
.context(ProcedureStateReceiverNotFoundSnafu { procedure_id })?;
receivers.push((receiver, subprocedure));
}
let mut tasks = Vec::with_capacity(receivers.len());
for (receiver, subprocedure) in receivers.iter_mut() {
tasks.push(async move {
watcher::wait(receiver).await.inspect_err(|e| {
error!(e; "inflight subprocedure failed, parent procedure_id: {}, procedure: {}", procedure_ctx.procedure_id, subprocedure);
})
});
}
if fail_fast {
try_join_all(tasks).await.context(WaitProcedureSnafu)?;
return Ok(WaitForInflightSubproceduresResult::Success(
subprocedures.iter().collect(),
));
}
// If fail_fast is false, we need to wait for all subprocedures to complete.
let results = join_all(tasks).await;
let failed_procedures_num = results.iter().filter(|r| r.is_err()).count();
if failed_procedures_num == 0 {
return Ok(WaitForInflightSubproceduresResult::Success(
subprocedures.iter().collect(),
));
}
warn!(
"{} inflight subprocedures failed, total: {}, parent procedure_id: {}",
failed_procedures_num,
subprocedures.len(),
procedure_ctx.procedure_id
);
let mut failed_procedures = Vec::with_capacity(failed_procedures_num);
let mut success_procedures = Vec::with_capacity(subprocedures.len() - failed_procedures_num);
for (result, subprocedure) in results.into_iter().zip(subprocedures) {
if result.is_err() {
failed_procedures.push(subprocedure);
} else {
success_procedures.push(subprocedure);
}
}
Ok(WaitForInflightSubproceduresResult::PartialSuccess(
PartialSuccessResult {
failed_procedures,
success_procedures,
},
))
}
#[derive(Clone)]
pub struct Context {
pub node_manager: NodeManagerRef,
@@ -413,6 +496,446 @@ pub struct Context {
pub cache_invalidator: CacheInvalidatorRef,
}
/// Metadata for an inflight physical table subprocedure.
pub struct PhysicalTableMeta {
pub procedure_id: ProcedureId,
pub table_id: TableId,
pub table_name: TableName,
}
/// Metadata for an inflight logical table subprocedure.
pub struct LogicalTableMeta {
pub procedure_id: ProcedureId,
pub physical_table_id: TableId,
pub physical_table_name: TableName,
pub logical_tables: Vec<(TableId, TableName)>,
}
/// Metadata for an inflight database subprocedure.
pub struct ReconcileDatabaseMeta {
pub procedure_id: ProcedureId,
pub catalog: String,
pub schema: String,
}
/// The inflight subprocedure metadata.
pub enum SubprocedureMeta {
PhysicalTable(PhysicalTableMeta),
LogicalTable(LogicalTableMeta),
Database(ReconcileDatabaseMeta),
}
impl Display for SubprocedureMeta {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SubprocedureMeta::PhysicalTable(meta) => {
write!(
f,
"ReconcilePhysicalTable(procedure_id: {}, table_id: {}, table_name: {})",
meta.procedure_id, meta.table_id, meta.table_name
)
}
SubprocedureMeta::LogicalTable(meta) => {
write!(
f,
"ReconcileLogicalTable(procedure_id: {}, physical_table_id: {}, physical_table_name: {}, logical_tables: {:?})",
meta.procedure_id, meta.physical_table_id, meta.physical_table_name, meta.logical_tables
)
}
SubprocedureMeta::Database(meta) => {
write!(
f,
"ReconcileDatabase(procedure_id: {}, catalog: {}, schema: {})",
meta.procedure_id, meta.catalog, meta.schema
)
}
}
}
}
impl SubprocedureMeta {
/// Creates a new logical table subprocedure metadata.
pub fn new_logical_table(
procedure_id: ProcedureId,
physical_table_id: TableId,
physical_table_name: TableName,
logical_tables: Vec<(TableId, TableName)>,
) -> Self {
Self::LogicalTable(LogicalTableMeta {
procedure_id,
physical_table_id,
physical_table_name,
logical_tables,
})
}
/// Creates a new physical table subprocedure metadata.
pub fn new_physical_table(
procedure_id: ProcedureId,
table_id: TableId,
table_name: TableName,
) -> Self {
Self::PhysicalTable(PhysicalTableMeta {
procedure_id,
table_id,
table_name,
})
}
/// Creates a new reconcile database subprocedure metadata.
pub fn new_reconcile_database(
procedure_id: ProcedureId,
catalog: String,
schema: String,
) -> Self {
Self::Database(ReconcileDatabaseMeta {
procedure_id,
catalog,
schema,
})
}
/// Returns the procedure id of the subprocedure.
pub fn procedure_id(&self) -> ProcedureId {
match self {
SubprocedureMeta::PhysicalTable(meta) => meta.procedure_id,
SubprocedureMeta::LogicalTable(meta) => meta.procedure_id,
SubprocedureMeta::Database(meta) => meta.procedure_id,
}
}
/// Returns the number of tables will be reconciled.
pub fn table_num(&self) -> usize {
match self {
SubprocedureMeta::PhysicalTable(_) => 1,
SubprocedureMeta::LogicalTable(meta) => meta.logical_tables.len(),
SubprocedureMeta::Database(_) => 0,
}
}
/// Returns the number of databases will be reconciled.
pub fn database_num(&self) -> usize {
match self {
SubprocedureMeta::Database(_) => 1,
_ => 0,
}
}
}
/// The metrics of reconciling catalog.
#[derive(Clone, Default)]
pub struct ReconcileCatalogMetrics {
pub succeeded_databases: usize,
pub failed_databases: usize,
}
impl AddAssign for ReconcileCatalogMetrics {
fn add_assign(&mut self, other: Self) {
self.succeeded_databases += other.succeeded_databases;
self.failed_databases += other.failed_databases;
}
}
impl Display for ReconcileCatalogMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"succeeded_databases: {}, failed_databases: {}",
self.succeeded_databases, self.failed_databases
)
}
}
impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileCatalogMetrics {
fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
match result {
WaitForInflightSubproceduresResult::Success(subprocedures) => ReconcileCatalogMetrics {
succeeded_databases: subprocedures.len(),
failed_databases: 0,
},
WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
failed_procedures,
success_procedures,
}) => {
let succeeded_databases = success_procedures
.iter()
.map(|subprocedure| subprocedure.database_num())
.sum();
let failed_databases = failed_procedures
.iter()
.map(|subprocedure| subprocedure.database_num())
.sum();
ReconcileCatalogMetrics {
succeeded_databases,
failed_databases,
}
}
}
}
}
/// The metrics of reconciling database.
#[derive(Clone, Default)]
pub struct ReconcileDatabaseMetrics {
pub succeeded_tables: usize,
pub failed_tables: usize,
pub succeeded_procedures: usize,
pub failed_procedures: usize,
}
impl Display for ReconcileDatabaseMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "succeeded_tables: {}, failed_tables: {}, succeeded_procedures: {}, failed_procedures: {}", self.succeeded_tables, self.failed_tables, self.succeeded_procedures, self.failed_procedures)
}
}
impl AddAssign for ReconcileDatabaseMetrics {
fn add_assign(&mut self, other: Self) {
self.succeeded_tables += other.succeeded_tables;
self.failed_tables += other.failed_tables;
self.succeeded_procedures += other.succeeded_procedures;
self.failed_procedures += other.failed_procedures;
}
}
impl From<WaitForInflightSubproceduresResult<'_>> for ReconcileDatabaseMetrics {
fn from(result: WaitForInflightSubproceduresResult<'_>) -> Self {
match result {
WaitForInflightSubproceduresResult::Success(subprocedures) => {
let table_num = subprocedures
.iter()
.map(|subprocedure| subprocedure.table_num())
.sum();
ReconcileDatabaseMetrics {
succeeded_procedures: subprocedures.len(),
failed_procedures: 0,
succeeded_tables: table_num,
failed_tables: 0,
}
}
WaitForInflightSubproceduresResult::PartialSuccess(PartialSuccessResult {
failed_procedures,
success_procedures,
}) => {
let succeeded_tables = success_procedures
.iter()
.map(|subprocedure| subprocedure.table_num())
.sum();
let failed_tables = failed_procedures
.iter()
.map(|subprocedure| subprocedure.table_num())
.sum();
ReconcileDatabaseMetrics {
succeeded_procedures: success_procedures.len(),
failed_procedures: failed_procedures.len(),
succeeded_tables,
failed_tables,
}
}
}
}
}
/// The metrics of reconciling logical tables.
#[derive(Clone)]
pub struct ReconcileLogicalTableMetrics {
pub start_time: Instant,
pub update_table_info_count: usize,
pub create_tables_count: usize,
pub column_metadata_consistent_count: usize,
pub column_metadata_inconsistent_count: usize,
}
impl Default for ReconcileLogicalTableMetrics {
fn default() -> Self {
Self {
start_time: Instant::now(),
update_table_info_count: 0,
create_tables_count: 0,
column_metadata_consistent_count: 0,
column_metadata_inconsistent_count: 0,
}
}
}
const CREATE_TABLES: &str = "create_tables";
const UPDATE_TABLE_INFO: &str = "update_table_info";
const COLUMN_METADATA_CONSISTENT: &str = "column_metadata_consistent";
const COLUMN_METADATA_INCONSISTENT: &str = "column_metadata_inconsistent";
impl ReconcileLogicalTableMetrics {
/// The total number of tables that have been reconciled.
pub fn total_table_count(&self) -> usize {
self.create_tables_count
+ self.column_metadata_consistent_count
+ self.column_metadata_inconsistent_count
}
}
impl Drop for ReconcileLogicalTableMetrics {
fn drop(&mut self) {
let procedure_name = ReconcileLogicalTablesProcedure::TYPE_NAME;
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[procedure_name, metrics::TABLE_TYPE_LOGICAL, CREATE_TABLES])
.inc_by(self.create_tables_count as u64);
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
procedure_name,
metrics::TABLE_TYPE_LOGICAL,
UPDATE_TABLE_INFO,
])
.inc_by(self.update_table_info_count as u64);
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
procedure_name,
metrics::TABLE_TYPE_LOGICAL,
COLUMN_METADATA_CONSISTENT,
])
.inc_by(self.column_metadata_consistent_count as u64);
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
procedure_name,
metrics::TABLE_TYPE_LOGICAL,
COLUMN_METADATA_INCONSISTENT,
])
.inc_by(self.column_metadata_inconsistent_count as u64);
}
}
impl Display for ReconcileLogicalTableMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let elapsed = self.start_time.elapsed();
if self.create_tables_count > 0 {
write!(f, "create_tables_count: {}, ", self.create_tables_count)?;
}
if self.update_table_info_count > 0 {
write!(
f,
"update_table_info_count: {}, ",
self.update_table_info_count
)?;
}
if self.column_metadata_consistent_count > 0 {
write!(
f,
"column_metadata_consistent_count: {}, ",
self.column_metadata_consistent_count
)?;
}
if self.column_metadata_inconsistent_count > 0 {
write!(
f,
"column_metadata_inconsistent_count: {}, ",
self.column_metadata_inconsistent_count
)?;
}
write!(
f,
"total_table_count: {}, elapsed: {:?}",
self.total_table_count(),
elapsed
)
}
}
/// The result of resolving column metadata.
#[derive(Clone, Copy)]
pub enum ResolveColumnMetadataResult {
Consistent,
Inconsistent(ResolveStrategy),
}
impl Display for ResolveColumnMetadataResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ResolveColumnMetadataResult::Consistent => write!(f, "Consistent"),
ResolveColumnMetadataResult::Inconsistent(strategy) => {
let strategy_str = strategy.as_ref();
write!(f, "Inconsistent({})", strategy_str)
}
}
}
}
/// The metrics of reconciling physical tables.
#[derive(Clone)]
pub struct ReconcileTableMetrics {
/// The start time of the reconciliation.
pub start_time: Instant,
/// The result of resolving column metadata.
pub resolve_column_metadata_result: Option<ResolveColumnMetadataResult>,
/// Whether the table info has been updated.
pub update_table_info: bool,
}
impl Drop for ReconcileTableMetrics {
fn drop(&mut self) {
if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
match resolve_column_metadata_result {
ResolveColumnMetadataResult::Consistent => {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileTableProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
COLUMN_METADATA_CONSISTENT,
])
.inc();
}
ResolveColumnMetadataResult::Inconsistent(strategy) => {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileTableProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
COLUMN_METADATA_INCONSISTENT,
])
.inc();
metrics::METRIC_META_RECONCILIATION_RESOLVED_COLUMN_METADATA
.with_label_values(&[strategy.as_ref()])
.inc();
}
}
}
if self.update_table_info {
metrics::METRIC_META_RECONCILIATION_STATS
.with_label_values(&[
ReconcileTableProcedure::TYPE_NAME,
metrics::TABLE_TYPE_PHYSICAL,
UPDATE_TABLE_INFO,
])
.inc();
}
}
}
impl Default for ReconcileTableMetrics {
fn default() -> Self {
Self {
start_time: Instant::now(),
resolve_column_metadata_result: None,
update_table_info: false,
}
}
}
impl Display for ReconcileTableMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let elapsed = self.start_time.elapsed();
if let Some(resolve_column_metadata_result) = self.resolve_column_metadata_result {
write!(
f,
"resolve_column_metadata_result: {}, ",
resolve_column_metadata_result
)?;
}
write!(
f,
"update_table_info: {}, elapsed: {:?}",
self.update_table_info, elapsed
)
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
@@ -665,10 +1188,7 @@ mod tests {
semantic_type: SemanticType::Field,
column_id: 3,
});
assert!(check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap();
}
#[test]
@@ -676,18 +1196,12 @@ mod tests {
let column_metadatas = new_test_column_metadatas();
let mut new_column_metadatas = column_metadatas.clone();
new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
assert!(!check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
let column_metadatas = new_test_column_metadatas();
let mut new_column_metadatas = column_metadatas.clone();
new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag);
assert!(!check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
}
#[test]
@@ -700,10 +1214,7 @@ mod tests {
{
col.column_id = 100;
}
assert!(!check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
let column_metadatas = new_test_column_metadatas();
let mut new_column_metadatas = column_metadatas.clone();
@@ -713,10 +1224,7 @@ mod tests {
{
col.column_id = 100;
}
assert!(!check_column_metadata_invariants(
&new_column_metadatas,
&column_metadatas
));
check_column_metadata_invariants(&new_column_metadatas, &column_metadatas).unwrap_err();
}
#[test]

View File

@@ -639,16 +639,24 @@ async fn open_all_regions(
ignore_nonexistent_region,
)
.await?;
ensure!(
open_regions.len() == num_regions,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of regions, only {} of regions has opened",
num_regions,
open_regions.len()
)
}
);
if !ignore_nonexistent_region {
ensure!(
open_regions.len() == num_regions,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of regions, only {} of regions has opened",
num_regions,
open_regions.len()
)
}
);
} else if open_regions.len() != num_regions {
warn!(
"ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
num_regions,
open_regions.len()
);
}
for region_id in open_regions {
if open_with_writable {
@@ -688,16 +696,24 @@ async fn open_all_regions(
)
.await?;
ensure!(
open_regions.len() == num_regions,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of follower regions, only {} of regions has opened",
num_regions,
open_regions.len()
)
}
);
if !ignore_nonexistent_region {
ensure!(
open_regions.len() == num_regions,
error::UnexpectedSnafu {
violated: format!(
"Expected to open {} of follower regions, only {} of regions has opened",
num_regions,
open_regions.len()
)
}
);
} else if open_regions.len() != num_regions {
warn!(
"ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
num_regions,
open_regions.len()
);
}
}
info!("all regions are opened");