feat: introduce reconcile database procedure (#6612)

* feat: introduce reconcile database procedure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: hold the schema lock

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add todo

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: rename to `fast_fail`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-07-31 19:01:56 +08:00
committed by WenyXu
parent 1434582cc3
commit d3a1c80fbd
16 changed files with 908 additions and 1 deletions

1
Cargo.lock generated
View File

@@ -2593,6 +2593,7 @@ dependencies = [
"async-trait",
"common-procedure",
"snafu 0.8.5",
"tokio",
]
[[package]]

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_procedure::ProcedureId;
use common_wal::options::WalOptions;
use serde_json::error::Error as JsonError;
use snafu::{Location, Snafu};
@@ -140,6 +141,21 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to get procedure state receiver, procedure id: {procedure_id}"))]
ProcedureStateReceiver {
procedure_id: ProcedureId,
#[snafu(implicit)]
location: Location,
source: common_procedure::Error,
},
#[snafu(display("Procedure state receiver not found: {procedure_id}"))]
ProcedureStateReceiverNotFound {
procedure_id: ProcedureId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to wait procedure done"))]
WaitProcedure {
#[snafu(implicit)]
@@ -952,6 +968,7 @@ impl ErrorExt for Error {
NoLeader { .. } => StatusCode::TableUnavailable,
ValueNotExist { .. }
| ProcedurePoisonConflict { .. }
| ProcedureStateReceiverNotFound { .. }
| MissingColumnIds { .. }
| MissingColumnInColumnMetadata { .. }
| MismatchColumnId { .. }
@@ -1040,6 +1057,7 @@ impl ErrorExt for Error {
ConvertAlterTableRequest { source, .. } => source.status_code(),
PutPoison { source, .. } => source.status_code(),
ConvertColumnDef { source, .. } => source.status_code(),
ProcedureStateReceiver { source, .. } => source.status_code(),
ParseProcedureId { .. }
| InvalidNumTopics { .. }

View File

@@ -184,6 +184,17 @@ impl TableRouteValue {
}
}
/// Converts to [`LogicalTableRouteValue`].
///
/// # Panic
/// If it is not the [`LogicalTableRouteValue`].
pub fn into_logical_table_route(self) -> LogicalTableRouteValue {
match self {
TableRouteValue::Logical(x) => x,
_ => unreachable!("Mistakenly been treated as a Logical TableRoute: {self:?}"),
}
}
pub fn region_numbers(&self) -> Vec<RegionNumber> {
match self {
TableRouteValue::Physical(x) => x

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(weny): Remove it
#[allow(dead_code)]
pub(crate) mod reconcile_database;
// TODO(weny): Remove it
#[allow(dead_code)]
pub(crate) mod reconcile_table;

View File

@@ -0,0 +1,231 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod end;
pub(crate) mod reconcile_logical_tables;
pub(crate) mod reconcile_tables;
pub(crate) mod start;
pub(crate) mod utils;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, ProcedureId,
Result as ProcedureResult, Status,
};
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::TableId;
use table::table_name::TableName;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::error::Result;
use crate::key::table_name::TableNameValue;
use crate::key::TableMetadataManagerRef;
use crate::lock_key::{CatalogLock, SchemaLock};
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_database::start::ReconcileDatabaseStart;
use crate::reconciliation::reconcile_database::utils::wait_for_inflight_subprocedures;
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::utils::Context;
pub(crate) struct ReconcileDatabaseContext {
pub node_manager: NodeManagerRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
persistent_ctx: PersistentContext,
volatile_ctx: VolatileContext,
}
impl ReconcileDatabaseContext {
pub fn new(ctx: Context, persistent_ctx: PersistentContext) -> Self {
Self {
node_manager: ctx.node_manager,
table_metadata_manager: ctx.table_metadata_manager,
cache_invalidator: ctx.cache_invalidator,
persistent_ctx,
volatile_ctx: VolatileContext::default(),
}
}
pub(crate) async fn wait_for_inflight_subprocedures(
&mut self,
procedure_ctx: &ProcedureContext,
) -> Result<()> {
if !self.volatile_ctx.inflight_subprocedures.is_empty() {
wait_for_inflight_subprocedures(
procedure_ctx,
&self.volatile_ctx.inflight_subprocedures,
self.persistent_ctx.fail_fast,
)
.await?;
self.volatile_ctx.inflight_subprocedures.clear();
}
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct PersistentContext {
catalog: String,
schema: String,
fail_fast: bool,
parallelism: usize,
resolve_strategy: ResolveStrategy,
}
impl PersistentContext {
pub fn new(
catalog: String,
schema: String,
fail_fast: bool,
parallelism: usize,
resolve_strategy: ResolveStrategy,
) -> Self {
Self {
catalog,
schema,
fail_fast,
parallelism,
resolve_strategy,
}
}
}
#[derive(Default)]
pub(crate) struct VolatileContext {
/// Stores pending physical tables.
pending_tables: Vec<(TableId, TableName)>,
/// Stores pending logical tables associated with each physical table.
///
/// - Key: Table ID of the physical table.
/// - Value: Vector of (TableId, TableName) tuples representing logical tables belonging to the physical table.
pending_logical_tables: HashMap<TableId, Vec<(TableId, TableName)>>,
/// Stores inflight subprocedures.
inflight_subprocedures: Vec<ProcedureId>,
/// Stores the stream of tables.
tables: Option<BoxStream<'static, Result<(String, TableNameValue)>>>,
}
pub struct ReconcileDatabaseProcedure {
pub context: ReconcileDatabaseContext,
state: Box<dyn State>,
}
impl ReconcileDatabaseProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::ReconcileDatabase";
pub fn new(
ctx: Context,
catalog: String,
schema: String,
fail_fast: bool,
parallelism: usize,
resolve_strategy: ResolveStrategy,
) -> Self {
let persistent_ctx =
PersistentContext::new(catalog, schema, fail_fast, parallelism, resolve_strategy);
let context = ReconcileDatabaseContext::new(ctx, persistent_ctx);
let state = Box::new(ReconcileDatabaseStart);
Self { context, state }
}
pub(crate) fn from_json(ctx: Context, json: &str) -> ProcedureResult<Self> {
let ProcedureDataOwned {
state,
persistent_ctx,
} = serde_json::from_str(json).context(FromJsonSnafu)?;
let context = ReconcileDatabaseContext::new(ctx, persistent_ctx);
Ok(Self { context, state })
}
}
#[derive(Debug, Serialize)]
struct ProcedureData<'a> {
state: &'a dyn State,
persistent_ctx: &'a PersistentContext,
}
#[derive(Debug, Deserialize)]
struct ProcedureDataOwned {
state: Box<dyn State>,
persistent_ctx: PersistentContext,
}
#[async_trait]
impl Procedure for ReconcileDatabaseProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
Ok(status)
}
Err(e) => {
if e.is_retry_later() {
Err(ProcedureError::retry_later(e))
} else {
Err(ProcedureError::external(e))
}
}
}
}
fn dump(&self) -> ProcedureResult<String> {
let data = ProcedureData {
state: self.state.as_ref(),
persistent_ctx: &self.context.persistent_ctx,
};
serde_json::to_string(&data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let catalog = &self.context.persistent_ctx.catalog;
let schema = &self.context.persistent_ctx.schema;
LockKey::new(vec![
CatalogLock::Read(catalog).into(),
SchemaLock::write(catalog, schema).into(),
])
}
}
#[async_trait::async_trait]
#[typetag::serde(tag = "reconcile_database_state")]
pub(crate) trait State: Sync + Send + Debug {
fn name(&self) -> &'static str {
let type_name = std::any::type_name::<Self>();
// short name
type_name.split("::").last().unwrap_or(type_name)
}
async fn next(
&mut self,
ctx: &mut ReconcileDatabaseContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)>;
fn as_any(&self) -> &dyn Any;
}

View File

@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileDatabaseEnd;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ReconcileDatabaseEnd {
async fn next(
&mut self,
_ctx: &mut ReconcileDatabaseContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(ReconcileDatabaseEnd), Status::done()))
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,234 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
use common_telemetry::info;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::error::{Result, TableInfoNotFoundSnafu};
use crate::key::table_route::TableRouteValue;
use crate::reconciliation::reconcile_database::end::ReconcileDatabaseEnd;
use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
use crate::reconciliation::utils::Context;
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileLogicalTables;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ReconcileLogicalTables {
async fn next(
&mut self,
ctx: &mut ReconcileDatabaseContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
info!(
"Reconcile logical tables in database: {}, catalog: {}, inflight_subprocedures: {}",
ctx.persistent_ctx.schema,
ctx.persistent_ctx.catalog,
ctx.volatile_ctx.inflight_subprocedures.len()
);
// Waits for inflight subprocedures first.
ctx.wait_for_inflight_subprocedures(procedure_ctx).await?;
let catalog = &ctx.persistent_ctx.catalog;
let schema = &ctx.persistent_ctx.schema;
let parallelism = ctx.persistent_ctx.parallelism;
if ctx.volatile_ctx.tables.as_deref().is_none() {
let tables = ctx
.table_metadata_manager
.table_name_manager()
.tables(catalog, schema);
ctx.volatile_ctx.tables = Some(tables);
}
let pending_logical_tables = &mut ctx.volatile_ctx.pending_logical_tables;
let mut pending_procedures = Vec::with_capacity(parallelism);
let context = Context {
node_manager: ctx.node_manager.clone(),
table_metadata_manager: ctx.table_metadata_manager.clone(),
cache_invalidator: ctx.cache_invalidator.clone(),
};
// Safety: initialized above.
while let Some((table_name, table_name_value)) =
ctx.volatile_ctx.tables.as_mut().unwrap().try_next().await?
{
let table_id = table_name_value.table_id();
let Some(table_route) = ctx
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await?
else {
continue;
};
let table_ref = TableReference::full(catalog, schema, &table_name);
Self::enqueue_logical_table(pending_logical_tables, table_id, table_ref, table_route);
// Try to build reconcile logical tables procedure.
if let Some(procedure) = Self::try_build_reconcile_logical_tables_procedure(
&context,
pending_logical_tables,
parallelism,
)
.await?
{
pending_procedures.push(procedure);
}
// Schedule reconcile logical tables procedures if the number of pending procedures
// is greater than or equal to parallelism.
if Self::should_schedule_reconcile_logical_tables(&pending_procedures, parallelism) {
return Self::schedule_reconcile_logical_tables(ctx, &mut pending_procedures);
}
}
// Build remaining procedures.
Self::build_remaining_procedures(
&context,
pending_logical_tables,
&mut pending_procedures,
parallelism,
)
.await?;
// If there are remaining procedures, schedule reconcile logical tables procedures.
if !pending_procedures.is_empty() {
return Self::schedule_reconcile_logical_tables(ctx, &mut pending_procedures);
}
ctx.volatile_ctx.tables.take();
Ok((Box::new(ReconcileDatabaseEnd), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl ReconcileLogicalTables {
fn schedule_reconcile_logical_tables(
ctx: &mut ReconcileDatabaseContext,
buffer: &mut Vec<ProcedureWithId>,
) -> Result<(Box<dyn State>, Status)> {
let procedures = std::mem::take(buffer);
ctx.volatile_ctx
.inflight_subprocedures
.extend(procedures.iter().map(|p| p.id));
Ok((
Box::new(ReconcileLogicalTables),
Status::suspended(procedures, false),
))
}
fn should_schedule_reconcile_logical_tables(
buffer: &[ProcedureWithId],
parallelism: usize,
) -> bool {
buffer.len() >= parallelism
}
async fn try_build_reconcile_logical_tables_procedure(
ctx: &Context,
pending_logical_tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
parallelism: usize,
) -> Result<Option<ProcedureWithId>> {
let mut physical_table_id = None;
for (table_id, tables) in pending_logical_tables.iter() {
if tables.len() >= parallelism {
physical_table_id = Some(*table_id);
break;
}
}
if let Some(physical_table_id) = physical_table_id {
// Safety: Checked above.
let tables = pending_logical_tables.remove(&physical_table_id).unwrap();
return Ok(Some(
Self::build_reconcile_logical_tables_procedure(ctx, physical_table_id, tables)
.await?,
));
}
Ok(None)
}
async fn build_remaining_procedures(
ctx: &Context,
pending_logical_tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
pending_procedures: &mut Vec<ProcedureWithId>,
parallelism: usize,
) -> Result<()> {
if pending_logical_tables.is_empty() {
return Ok(());
}
while let Some(physical_table_id) = pending_logical_tables.keys().next().cloned() {
if pending_procedures.len() >= parallelism {
return Ok(());
}
// Safety: Checked above.
let tables = pending_logical_tables.remove(&physical_table_id).unwrap();
pending_procedures.push(
Self::build_reconcile_logical_tables_procedure(ctx, physical_table_id, tables)
.await?,
);
}
Ok(())
}
async fn build_reconcile_logical_tables_procedure(
ctx: &Context,
physical_table_id: TableId,
_logical_tables: Vec<(TableId, TableName)>,
) -> Result<ProcedureWithId> {
let table_info = ctx
.table_metadata_manager
.table_info_manager()
.get(physical_table_id)
.await?
.context(TableInfoNotFoundSnafu {
table: format!("table_id: {}", physical_table_id),
})?;
let _physical_table_name = table_info.table_name();
todo!()
}
fn enqueue_logical_table(
tables: &mut HashMap<TableId, Vec<(TableId, TableName)>>,
table_id: TableId,
table_ref: TableReference<'_>,
table_route: TableRouteValue,
) {
if !table_route.is_physical() {
let logical_table_route = table_route.into_logical_table_route();
let physical_table_id = logical_table_route.physical_table_id();
tables
.entry(physical_table_id)
.or_default()
.push((table_id, table_ref.into()));
}
}
}

View File

@@ -0,0 +1,164 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
use common_telemetry::info;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use store_api::storage::TableId;
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::error::Result;
use crate::key::table_route::TableRouteValue;
use crate::reconciliation::reconcile_database::reconcile_logical_tables::ReconcileLogicalTables;
use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
use crate::reconciliation::utils::Context;
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileTables;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ReconcileTables {
async fn next(
&mut self,
ctx: &mut ReconcileDatabaseContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
info!(
"Reconcile tables in database: {}, catalog: {}, inflight_subprocedures: {}",
ctx.persistent_ctx.schema,
ctx.persistent_ctx.catalog,
ctx.volatile_ctx.inflight_subprocedures.len()
);
// Waits for inflight subprocedures first.
ctx.wait_for_inflight_subprocedures(procedure_ctx).await?;
let catalog = &ctx.persistent_ctx.catalog;
let schema = &ctx.persistent_ctx.schema;
let parallelism = ctx.persistent_ctx.parallelism;
if ctx.volatile_ctx.tables.as_deref().is_none() {
let tables = ctx
.table_metadata_manager
.table_name_manager()
.tables(catalog, schema);
ctx.volatile_ctx.tables = Some(tables);
}
let pending_tables = &mut ctx.volatile_ctx.pending_tables;
// Safety: must exists.
while let Some((table_name, table_name_value)) =
ctx.volatile_ctx.tables.as_mut().unwrap().try_next().await?
{
let table_id = table_name_value.table_id();
let Some(table_route) = ctx
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(table_id)
.await?
else {
continue;
};
let table_ref = TableReference::full(catalog, schema, &table_name);
// Enqueue table.
Self::enqueue_table(pending_tables, table_id, table_ref, table_route);
// Schedule reconcile table procedures if the number of pending procedures
// is greater than or equal to parallelism.
if Self::should_schedule_reconcile_tables(pending_tables, parallelism) {
return Self::schedule_reconcile_tables(ctx);
}
}
// If there are remaining tables, schedule reconcile table procedures.
if !pending_tables.is_empty() {
return Self::schedule_reconcile_tables(ctx);
}
ctx.volatile_ctx.tables.take();
Ok((Box::new(ReconcileLogicalTables), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl ReconcileTables {
fn schedule_reconcile_tables(
ctx: &mut ReconcileDatabaseContext,
) -> Result<(Box<dyn State>, Status)> {
let tables = std::mem::take(&mut ctx.volatile_ctx.pending_tables);
let subprocedures = Self::build_reconcile_table_procedures(ctx, tables);
ctx.volatile_ctx
.inflight_subprocedures
.extend(subprocedures.iter().map(|p| p.id));
Ok((
Box::new(ReconcileTables),
Status::suspended(subprocedures, false),
))
}
fn should_schedule_reconcile_tables(
pending_tables: &[(TableId, TableName)],
parallelism: usize,
) -> bool {
pending_tables.len() >= parallelism
}
fn build_reconcile_table_procedures(
ctx: &ReconcileDatabaseContext,
tables: Vec<(TableId, TableName)>,
) -> Vec<ProcedureWithId> {
let mut procedures = Vec::with_capacity(tables.len());
for (table_id, table_name) in tables {
let context = Context {
node_manager: ctx.node_manager.clone(),
table_metadata_manager: ctx.table_metadata_manager.clone(),
cache_invalidator: ctx.cache_invalidator.clone(),
};
let procedure = ReconcileTableProcedure::new(
context,
table_id,
table_name.clone(),
ctx.persistent_ctx.resolve_strategy,
true,
);
let procedure = ProcedureWithId::with_random_id(Box::new(procedure));
info!(
"Reconcile table: {}, table_id: {}, procedure_id: {}",
table_name, table_id, procedure.id
);
procedures.push(procedure)
}
procedures
}
fn enqueue_table(
tables: &mut Vec<(TableId, TableName)>,
table_id: TableId,
table_ref: TableReference<'_>,
table_route: TableRouteValue,
) {
if table_route.is_physical() {
tables.push((table_id, table_ref.into()));
}
}
}

View File

@@ -0,0 +1,63 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::error::{self, Result};
use crate::key::schema_name::SchemaNameKey;
use crate::reconciliation::reconcile_database::reconcile_tables::ReconcileTables;
use crate::reconciliation::reconcile_database::{ReconcileDatabaseContext, State};
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ReconcileDatabaseStart;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ReconcileDatabaseStart {
async fn next(
&mut self,
ctx: &mut ReconcileDatabaseContext,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let exists = ctx
.table_metadata_manager
.schema_manager()
.exists(SchemaNameKey {
catalog: &ctx.persistent_ctx.catalog,
schema: &ctx.persistent_ctx.schema,
})
.await?;
ensure!(
exists,
error::SchemaNotFoundSnafu {
table_schema: &ctx.persistent_ctx.schema,
},
);
info!(
"Reconcile database: {}, catalog: {}",
ctx.persistent_ctx.schema, ctx.persistent_ctx.catalog
);
Ok((Box::new(ReconcileTables), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,79 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_procedure::{watcher, Context as ProcedureContext, ProcedureId};
use common_telemetry::{error, info, warn};
use futures::future::{join_all, try_join_all};
use snafu::{OptionExt, ResultExt};
use crate::error::{
ProcedureStateReceiverNotFoundSnafu, ProcedureStateReceiverSnafu, Result, WaitProcedureSnafu,
};
/// Wait for inflight subprocedures.
///
/// If `fail_fast` is true, the function will return an error if any subprocedure fails.
/// Otherwise, the function will continue waiting for all subprocedures to complete.
pub(crate) async fn wait_for_inflight_subprocedures(
procedure_ctx: &ProcedureContext,
subprocedures: &[ProcedureId],
fail_fast: bool,
) -> Result<()> {
let mut receivers = Vec::with_capacity(subprocedures.len());
for procedure_id in subprocedures {
let receiver = procedure_ctx
.provider
.procedure_state_receiver(*procedure_id)
.await
.context(ProcedureStateReceiverSnafu {
procedure_id: *procedure_id,
})?
.context(ProcedureStateReceiverNotFoundSnafu {
procedure_id: *procedure_id,
})?;
receivers.push(receiver);
}
let mut tasks = Vec::with_capacity(receivers.len());
for receiver in receivers.iter_mut() {
let fut = watcher::wait(receiver);
tasks.push(fut);
}
if fail_fast {
try_join_all(tasks).await.context(WaitProcedureSnafu)?;
} else {
let mut failed = 0;
let total = tasks.len();
for result in join_all(tasks).await {
if let Err(e) = result {
error!(e; "inflight subprocedure, procedure_id: {}", procedure_ctx.procedure_id);
failed += 1;
}
}
if failed > 0 {
warn!(
"{} inflight subprocedures failed, total: {}, procedure_id: {}",
failed, total, procedure_ctx.procedure_id
);
} else {
info!(
"{} inflight subprocedures completed, procedure_id: {}",
total, procedure_ctx.procedure_id
);
}
}
Ok(())
}

View File

@@ -85,6 +85,8 @@ pub(crate) struct PersistentContext {
// The physical table route.
// The value will be set in `ReconciliationStart` state.
pub(crate) physical_table_route: Option<PhysicalTableRouteValue>,
// Whether the procedure is a subprocedure.
pub(crate) is_subprocedure: bool,
}
impl PersistentContext {
@@ -92,6 +94,7 @@ impl PersistentContext {
table_id: TableId,
table_name: TableName,
resolve_strategy: ResolveStrategy,
is_subprocedure: bool,
) -> Self {
Self {
table_id,
@@ -99,6 +102,7 @@ impl PersistentContext {
resolve_strategy,
table_info_value: None,
physical_table_route: None,
is_subprocedure,
}
}
}
@@ -143,8 +147,10 @@ impl ReconcileTableProcedure {
table_id: TableId,
table_name: TableName,
resolve_strategy: ResolveStrategy,
is_subprocedure: bool,
) -> Self {
let persistent_ctx = PersistentContext::new(table_id, table_name, resolve_strategy);
let persistent_ctx =
PersistentContext::new(table_id, table_name, resolve_strategy, is_subprocedure);
let context = ReconcileTableContext::new(ctx, persistent_ctx);
let state = Box::new(ReconciliationStart);
Self { context, state }
@@ -211,6 +217,17 @@ impl Procedure for ReconcileTableProcedure {
fn lock_key(&self) -> LockKey {
let table_ref = &self.context.table_name().table_ref();
if self.context.persistent_ctx.is_subprocedure {
// The catalog and schema are already locked by the parent procedure.
// Only lock the table name.
return LockKey::new(vec![TableNameLock::new(
table_ref.catalog,
table_ref.schema,
table_ref.table,
)
.into()]);
}
LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
SchemaLock::read(table_ref.catalog, table_ref.schema).into(),

View File

@@ -11,3 +11,4 @@ workspace = true
async-trait.workspace = true
common-procedure = { workspace = true, features = ["testing"] }
snafu.workspace = true
tokio.workspace = true

View File

@@ -26,6 +26,7 @@ use common_procedure::{
Context, ContextProvider, Output, PoisonKey, Procedure, ProcedureId, ProcedureState,
ProcedureWithId, Result, Status, StringKey,
};
use tokio::sync::watch::Receiver;
/// A Mock [ContextProvider].
#[derive(Default)]
@@ -57,6 +58,13 @@ impl ContextProvider for MockContextProvider {
Ok(self.states.get(&procedure_id).cloned())
}
async fn procedure_state_receiver(
&self,
_procedure_id: ProcedureId,
) -> Result<Option<Receiver<ProcedureState>>> {
Ok(None)
}
async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
self.poison_manager
.try_put_poison(key.to_string(), procedure_id.to_string())

View File

@@ -247,6 +247,13 @@ impl ContextProvider for ManagerContext {
Ok(self.state(procedure_id))
}
async fn procedure_state_receiver(
&self,
procedure_id: ProcedureId,
) -> Result<Option<Receiver<ProcedureState>>> {
Ok(self.state_receiver(procedure_id))
}
async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
{
// validate the procedure exists
@@ -345,6 +352,14 @@ impl ManagerContext {
procedures.get(&procedure_id).map(|meta| meta.state())
}
/// Returns the [Receiver<ProcedureState>] of specific `procedure_id`.
fn state_receiver(&self, procedure_id: ProcedureId) -> Option<Receiver<ProcedureState>> {
let procedures = self.procedures.read().unwrap();
procedures
.get(&procedure_id)
.map(|meta| meta.state_receiver.clone())
}
/// Returns the [ProcedureMeta] of all procedures.
fn list_procedure(&self) -> Vec<ProcedureInfo> {
let procedures = self.procedures.read().unwrap();

View File

@@ -601,6 +601,7 @@ mod tests {
use futures_util::FutureExt;
use object_store::{EntryMode, ObjectStore};
use tokio::sync::mpsc;
use tokio::sync::watch::Receiver;
use super::*;
use crate::local::{test_util, DynamicKeyLockGuard};
@@ -668,6 +669,13 @@ mod tests {
unimplemented!()
}
async fn procedure_state_receiver(
&self,
_procedure_id: ProcedureId,
) -> Result<Option<Receiver<ProcedureState>>> {
unimplemented!()
}
async fn try_put_poison(
&self,
_key: &PoisonKey,

View File

@@ -22,6 +22,7 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use snafu::{ResultExt, Snafu};
use tokio::sync::watch::Receiver;
use uuid::Uuid;
use crate::error::{self, Error, Result};
@@ -58,6 +59,14 @@ pub enum Status {
}
impl Status {
/// Returns a [Status::Suspended] with given `subprocedures` and `persist` flag.
pub fn suspended(subprocedures: Vec<ProcedureWithId>, persist: bool) -> Status {
Status::Suspended {
subprocedures,
persist,
}
}
/// Returns a [Status::Poisoned] with given `keys` and `error`.
pub fn poisoned(keys: impl IntoIterator<Item = PoisonKey>, error: Error) -> Status {
Status::Poisoned {
@@ -140,6 +149,11 @@ pub trait ContextProvider: Send + Sync {
/// Query the procedure state.
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>>;
async fn procedure_state_receiver(
&self,
procedure_id: ProcedureId,
) -> Result<Option<Receiver<ProcedureState>>>;
/// Try to put a poison key for a procedure.
///
/// This method is used to mark a resource as being operated on by a procedure.