feat: introduce reconcile catalog procedure (#6613)

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-01 19:03:00 +08:00
committed by GitHub
parent 19ad9a7f85
commit cfaa9b4dda
9 changed files with 430 additions and 89 deletions

View File

@@ -403,6 +403,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Catalog not found, catalog: {}", catalog))]
CatalogNotFound {
catalog: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid metadata, err: {}", err_msg))]
InvalidMetadata {
err_msg: String,
@@ -1062,6 +1069,7 @@ impl ErrorExt for Error {
ParseProcedureId { .. }
| InvalidNumTopics { .. }
| SchemaNotFound { .. }
| CatalogNotFound { .. }
| InvalidNodeInfoKey { .. }
| InvalidStatKey { .. }
| ParseNum { .. }

View File

@@ -23,4 +23,5 @@ pub(crate) mod reconcile_table;
pub(crate) mod reconcile_logical_tables;
// TODO(weny): Remove it
#[allow(dead_code)]
pub(crate) mod reconcile_catalog;
pub(crate) mod utils;

View File

@@ -0,0 +1,198 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::fmt::Debug;
use common_procedure::error::FromJsonSnafu;
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, ProcedureId,
Result as ProcedureResult, Status,
};
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::error::Result;
use crate::key::TableMetadataManagerRef;
use crate::lock_key::CatalogLock;
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_catalog::start::ReconcileCatalogStart;
use crate::reconciliation::reconcile_database::utils::wait_for_inflight_subprocedures;
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::utils::Context;
pub(crate) mod end;
pub(crate) mod reconcile_databases;
pub(crate) mod start;
pub(crate) struct ReconcileCatalogContext {
pub node_manager: NodeManagerRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
persistent_ctx: PersistentContext,
volatile_ctx: VolatileContext,
}
impl ReconcileCatalogContext {
pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self {
Self {
node_manager: ctx.node_manager,
table_metadata_manager: ctx.table_metadata_manager,
cache_invalidator: ctx.cache_invalidator,
persistent_ctx,
volatile_ctx: VolatileContext::default(),
}
}
pub(crate) async fn wait_for_inflight_subprocedure(
&mut self,
procedure_ctx: &ProcedureContext,
) -> Result<()> {
if let Some(procedure_id) = self.volatile_ctx.inflight_subprocedure {
wait_for_inflight_subprocedures(
procedure_ctx,
&[procedure_id],
self.persistent_ctx.fast_fail,
)
.await?;
}
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct PersistentContext {
catalog: String,
fast_fail: bool,
resolve_strategy: ResolveStrategy,
}
impl PersistentContext {
pub fn new(catalog: String, fast_fail: bool, resolve_strategy: ResolveStrategy) -> Self {
Self {
catalog,
fast_fail,
resolve_strategy,
}
}
}
#[derive(Default)]
pub(crate) struct VolatileContext {
/// Stores the stream of catalogs.
schemas: Option<BoxStream<'static, Result<String>>>,
/// Stores the inflight subprocedure.
inflight_subprocedure: Option<ProcedureId>,
}
pub struct ReconcileCatalogProcedure {
pub context: ReconcileCatalogContext,
state: Box<dyn State>,
}
impl ReconcileCatalogProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileCatalog";
pub fn new(
ctx: Context,
catalog: String,
fast_fail: bool,
resolve_strategy: ResolveStrategy,
) -> Self {
let persistent_ctx = PersistentContext::new(catalog, fast_fail, resolve_strategy);
let context = ReconcileCatalogContext::new(ctx, persistent_ctx);
let state = Box::new(ReconcileCatalogStart);
Self { context, state }
}
pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult<Self> {
let ProcedureDataOwned {
state,
persistent_ctx,
} = serde_json::from_str(json).context(FromJsonSnafu)?;
let context = ReconcileCatalogContext::new(ctx, persistent_ctx);
Ok(Self { context, state })
}
}
#[derive(Debug, Serialize)]
struct ProcedureData<'a> {
state: &'a dyn State,
persistent_ctx: &'a PersistentContext,
}
#[derive(Debug, Deserialize)]
struct ProcedureDataOwned {
state: Box<dyn State>,
persistent_ctx: PersistentContext,
}
#[async_trait::async_trait]
impl Procedure for ReconcileCatalogProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
Ok(status)
}
Err(e) => {
if e.is_retry_later() {
Err(ProcedureError::retry_later(e))
} else {
Err(ProcedureError::external(e))
}
}
}
}
fn dump(&self) -> ProcedureResult<String> {
let data = ProcedureData {
state: self.state.as_ref(),
persistent_ctx: &self.context.persistent_ctx,
};
serde_json::to_string(&data).context(FromJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let catalog = &self.context.persistent_ctx.catalog;
LockKey::new(vec![CatalogLock::Write(catalog).into()])
}
}
#[async_trait::async_trait]
#[typetag::serde(tag = "reconcile_catalog_state")]
pub(crate) trait State: Sync + Send + Debug {
fn name(&self) -> &'static str {
let type_name = std::any::type_name::<Self>();
// short name
type_name.split("::").last().unwrap_or(type_name)
}
async fn next(
&mut self,
ctx: &mut ReconcileCatalogContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)>;
fn as_any(&self) -> &dyn Any;
}

View File

@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State};
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileCatalogEnd;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ReconcileCatalogEnd {
async fn next(
&mut self,
_ctx: &mut ReconcileCatalogContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(ReconcileCatalogEnd), Status::done()))
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,94 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::reconciliation::reconcile_catalog::end::ReconcileCatalogEnd;
use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State};
use crate::reconciliation::reconcile_database::{ReconcileDatabaseProcedure, DEFAULT_PARALLELISM};
use crate::reconciliation::utils::Context;
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileDatabases;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ReconcileDatabases {
async fn next(
&mut self,
ctx: &mut ReconcileCatalogContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
// Waits for inflight subprocedure first.
ctx.wait_for_inflight_subprocedure(procedure_ctx).await?;
if ctx.volatile_ctx.schemas.as_deref().is_none() {
let schemas = ctx
.table_metadata_manager
.schema_manager()
.schema_names(&ctx.persistent_ctx.catalog);
ctx.volatile_ctx.schemas = Some(schemas);
}
if let Some(catalog) = ctx
.volatile_ctx
.schemas
.as_mut()
.unwrap()
.try_next()
.await?
{
return Self::schedule_reconcile_database(ctx, catalog);
}
Ok((Box::new(ReconcileCatalogEnd), Status::executing(false)))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl ReconcileDatabases {
fn schedule_reconcile_database(
ctx: &mut ReconcileCatalogContext,
schema: String,
) -> Result<(Box<dyn State>, Status)> {
let context = Context {
node_manager: ctx.node_manager.clone(),
table_metadata_manager: ctx.table_metadata_manager.clone(),
cache_invalidator: ctx.cache_invalidator.clone(),
};
let procedure = ReconcileDatabaseProcedure::new(
context,
ctx.persistent_ctx.catalog.clone(),
schema,
ctx.persistent_ctx.fast_fail,
DEFAULT_PARALLELISM,
ctx.persistent_ctx.resolve_strategy,
true,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
Ok((
Box::new(ReconcileDatabases),
Status::suspended(vec![procedure_with_id], false),
))
}
}

View File

@@ -0,0 +1,58 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::error::{self, Result};
use crate::key::catalog_name::CatalogNameKey;
use crate::reconciliation::reconcile_catalog::reconcile_databases::ReconcileDatabases;
use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State};
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileCatalogStart;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ReconcileCatalogStart {
async fn next(
&mut self,
ctx: &mut ReconcileCatalogContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let exists = ctx
.table_metadata_manager
.catalog_manager()
.exists(CatalogNameKey {
catalog: &ctx.persistent_ctx.catalog,
})
.await?;
ensure!(
exists,
error::CatalogNotFoundSnafu {
catalog: &ctx.persistent_ctx.catalog
},
);
Ok((Box::new(ReconcileDatabases), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -45,6 +45,8 @@ use crate::reconciliation::reconcile_database::utils::wait_for_inflight_subproce
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::utils::Context;
pub(crate) const DEFAULT_PARALLELISM: usize = 64;
pub(crate) struct ReconcileDatabaseContext {
pub node_manager: NodeManagerRef,
pub table_metadata_manager: TableMetadataManagerRef,
@@ -89,6 +91,7 @@ pub(crate) struct PersistentContext {
fail_fast: bool,
parallelism: usize,
resolve_strategy: ResolveStrategy,
is_subprocedure: bool,
}
impl PersistentContext {
@@ -98,6 +101,7 @@ impl PersistentContext {
fail_fast: bool,
parallelism: usize,
resolve_strategy: ResolveStrategy,
is_subprocedure: bool,
) -> Self {
Self {
catalog,
@@ -105,6 +109,7 @@ impl PersistentContext {
fail_fast,
parallelism,
resolve_strategy,
is_subprocedure,
}
}
}
@@ -139,9 +144,16 @@ impl ReconcileDatabaseProcedure {
fail_fast: bool,
parallelism: usize,
resolve_strategy: ResolveStrategy,
is_subprocedure: bool,
) -> Self {
let persistent_ctx =
PersistentContext::new(catalog, schema, fail_fast, parallelism, resolve_strategy);
let persistent_ctx = PersistentContext::new(
catalog,
schema,
fail_fast,
parallelism,
resolve_strategy,
is_subprocedure,
);
let context = ReconcileDatabaseContext::new(ctx, persistent_ctx);
let state = Box::new(ReconcileDatabaseStart);
Self { context, state }
@@ -204,6 +216,10 @@ impl Procedure for ReconcileDatabaseProcedure {
fn lock_key(&self) -> LockKey {
let catalog = &self.context.persistent_ctx.catalog;
let schema = &self.context.persistent_ctx.schema;
// If the procedure is a subprocedure, only lock the schema.
if self.context.persistent_ctx.is_subprocedure {
return LockKey::new(vec![SchemaLock::write(catalog, schema).into()]);
}
LockKey::new(vec![
CatalogLock::Read(catalog).into(),

View File

@@ -28,6 +28,7 @@ use crate::error::{Result, TableInfoNotFoundSnafu};
use crate::key::table_route::TableRouteValue;
use crate::reconciliation::reconcile_database::end::ReconcileDatabaseEnd;
use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
use crate::reconciliation::utils::Context;
#[derive(Debug, Serialize, Deserialize)]
@@ -201,7 +202,7 @@ impl ReconcileLogicalTables {
async fn build_reconcile_logical_tables_procedure(
ctx: &Context,
physical_table_id: TableId,
_logical_tables: Vec<(TableId, TableName)>,
logical_tables: Vec<(TableId, TableName)>,
) -> Result<ProcedureWithId> {
let table_info = ctx
.table_metadata_manager
@@ -212,8 +213,16 @@ impl ReconcileLogicalTables {
table: format!("table_id: {}", physical_table_id),
})?;
let _physical_table_name = table_info.table_name();
todo!()
let physical_table_name = table_info.table_name();
let procedure = ReconcileLogicalTablesProcedure::new(
ctx.clone(),
physical_table_id,
physical_table_name,
logical_tables,
true,
);
Ok(ProcedureWithId::with_random_id(Box::new(procedure)))
}
fn enqueue_logical_table(

View File

@@ -22,14 +22,12 @@ use snafu::{ensure, OptionExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::{RegionId, TableId};
use table::metadata::{RawTableInfo, RawTableMeta};
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::error::{
self, MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu,
MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu,
};
use crate::key::table_name::{TableNameKey, TableNameManager};
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
@@ -397,87 +395,6 @@ pub(crate) fn build_table_meta_from_column_metadatas(
Ok(new_raw_table_meta)
}
/// Validates the table id and name consistency.
///
/// It will check the table id and table name consistency.
/// If the table id and table name are not consistent, it will return an error.
pub(crate) async fn validate_table_id_and_name(
table_name_manager: &TableNameManager,
table_id: TableId,
table_name: &TableName,
) -> Result<()> {
let table_name_key = TableNameKey::new(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
);
let table_name_value = table_name_manager
.get(table_name_key)
.await?
.with_context(|| error::TableNotFoundSnafu {
table_name: table_name.to_string(),
})?;
ensure!(
table_name_value.table_id() == table_id,
error::UnexpectedSnafu {
err_msg: format!(
"The table id mismatch for table: {}, expected {}, actual {}",
table_name,
table_id,
table_name_value.table_id()
),
}
);
Ok(())
}
/// Checks whether the column metadata invariants hold for the logical table.
///
/// Invariants:
/// - Primary key (Tag) columns must exist in the new metadata.
/// - Timestamp column must remain exactly the same in name and ID.
///
/// TODO(weny): add tests
pub(crate) fn check_column_metadatas_invariants_for_logical_table(
column_metadatas: &[ColumnMetadata],
table_info: &RawTableInfo,
) -> bool {
let new_primary_keys = column_metadatas
.iter()
.filter(|c| c.semantic_type == SemanticType::Tag)
.map(|c| c.column_schema.name.as_str())
.collect::<HashSet<_>>();
let old_primary_keys = table_info
.meta
.primary_key_indices
.iter()
.map(|i| table_info.meta.schema.column_schemas[*i].name.as_str());
for name in old_primary_keys {
if !new_primary_keys.contains(name) {
return false;
}
}
let old_timestamp_column_name = table_info
.meta
.schema
.column_schemas
.iter()
.find(|c| c.is_time_index())
.map(|c| c.name.as_str());
let new_timestamp_column_name = column_metadatas
.iter()
.find(|c| c.semantic_type == SemanticType::Timestamp)
.map(|c| c.column_schema.name.as_str());
old_timestamp_column_name != new_timestamp_column_name
}
/// Returns true if the logical table info needs to be updated.
///
/// The logical table only support to add columns, so we can check the length of column metadatas