feat: add integration tests for table reconciliation procedures part1 (#6705)

* feat: add integration tests for table reconciliation procedures

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-13 11:29:39 +08:00
committed by GitHub
parent 2992e70393
commit 1d84e802d8
7 changed files with 711 additions and 39 deletions

View File

@@ -18,3 +18,4 @@ pub(crate) mod reconcile_database;
pub(crate) mod reconcile_logical_tables;
pub(crate) mod reconcile_table;
pub(crate) mod utils;
pub use reconcile_table::resolve_column_metadata::ResolveStrategy;

View File

@@ -117,7 +117,8 @@ impl ReconciliationManager {
.await?;
if physical_table_id == table_id {
Ok(self.reconcile_physical_table(table_id, table_ref.into(), resolve_strategy))
self.reconcile_physical_table(table_id, table_ref.into(), resolve_strategy)
.await
} else {
let physical_table_info = table_metadata_manager
.table_info_manager()
@@ -127,24 +128,25 @@ impl ReconciliationManager {
table_name: format!("table_id: {}", physical_table_id),
})?;
Ok(self.reconcile_logical_tables(
self.reconcile_logical_tables(
physical_table_id,
physical_table_info.table_name(),
vec![(table_id, table_ref.into())],
))
)
.await
}
}
/// Reconcile a database.
///
/// Returns the procedure id of the reconciliation procedure.
pub fn reconcile_database(
pub async fn reconcile_database(
&self,
catalog: String,
schema: String,
resolve_strategy: ResolveStrategy,
parallelism: usize,
) -> ProcedureId {
) -> Result<ProcedureId> {
let parallelism = normalize_parallelism(parallelism);
let procedure = ReconcileDatabaseProcedure::new(
self.context.clone(),
@@ -155,15 +157,15 @@ impl ReconciliationManager {
resolve_strategy,
false,
);
self.spawn_procedure(Box::new(procedure))
self.spawn_procedure(Box::new(procedure)).await
}
fn reconcile_physical_table(
async fn reconcile_physical_table(
&self,
table_id: TableId,
table_name: TableName,
resolve_strategy: ResolveStrategy,
) -> ProcedureId {
) -> Result<ProcedureId> {
let procedure = ReconcileTableProcedure::new(
self.context.clone(),
table_id,
@@ -171,15 +173,15 @@ impl ReconciliationManager {
resolve_strategy,
false,
);
self.spawn_procedure(Box::new(procedure))
self.spawn_procedure(Box::new(procedure)).await
}
fn reconcile_logical_tables(
async fn reconcile_logical_tables(
&self,
physical_table_id: TableId,
physical_table_name: TableName,
logical_tables: Vec<(TableId, TableName)>,
) -> ProcedureId {
) -> Result<ProcedureId> {
let procedure = ReconcileLogicalTablesProcedure::new(
self.context.clone(),
physical_table_id,
@@ -187,18 +189,18 @@ impl ReconciliationManager {
logical_tables,
false,
);
self.spawn_procedure(Box::new(procedure))
self.spawn_procedure(Box::new(procedure)).await
}
/// Reconcile a catalog.
///
/// Returns the procedure id of the reconciliation procedure.
pub fn reconcile_catalog(
pub async fn reconcile_catalog(
&self,
catalog: String,
resolve_strategy: ResolveStrategy,
parallelism: usize,
) -> ProcedureId {
) -> Result<ProcedureId> {
let parallelism = normalize_parallelism(parallelism);
let procedure = ReconcileCatalogProcedure::new(
self.context.clone(),
@@ -207,29 +209,26 @@ impl ReconciliationManager {
resolve_strategy,
parallelism,
);
self.spawn_procedure(Box::new(procedure))
self.spawn_procedure(Box::new(procedure)).await
}
fn spawn_procedure(&self, procedure: BoxedProcedure) -> ProcedureId {
async fn spawn_procedure(&self, procedure: BoxedProcedure) -> Result<ProcedureId> {
let procedure_manager = self.procedure_manager.clone();
let procedure_with_id = ProcedureWithId::with_random_id(procedure);
let procedure_id = procedure_with_id.id;
let mut watcher = procedure_manager
.submit(procedure_with_id)
.await
.context(error::SubmitProcedureSnafu)?;
common_runtime::spawn_global(async move {
let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
Ok(watcher) => watcher,
Err(e) => {
error!(e; "Failed to submit reconciliation procedure {procedure_id}");
return;
}
};
if let Err(e) = watcher::wait(watcher).await {
if let Err(e) = watcher::wait(&mut watcher).await {
error!(e; "Failed to wait reconciliation procedure {procedure_id}");
return;
}
info!("Reconciliation procedure {procedure_id} is finished successfully!");
});
procedure_id
Ok(procedure_id)
}
}