fix: fix state transition in create table procedure (#6523)

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-07-15 14:06:27 +08:00
committed by GitHub
parent 2f9a10ab74
commit 1065db9518
7 changed files with 40 additions and 26 deletions

View File

@@ -77,6 +77,11 @@ pub fn build_oss_backend(
let op = ObjectStore::new(builder)
.context(error::BuildBackendSnafu)?
.layer(
object_store::layers::RetryLayer::new()
.with_jitter()
.with_notify(object_store::util::PrintDetailedError),
)
.layer(object_store::layers::LoggingLayer::default())
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::build_prometheus_metrics_layer(true))

View File

@@ -85,6 +85,11 @@ pub fn build_s3_backend(
// TODO(weny): Consider finding a better way to eliminate duplicate code.
Ok(ObjectStore::new(builder)
.context(error::BuildBackendSnafu)?
.layer(
object_store::layers::RetryLayer::new()
.with_jitter()
.with_notify(object_store::util::PrintDetailedError),
)
.layer(object_store::layers::LoggingLayer::new(
DefaultLoggingInterceptor,
))

View File

@@ -227,7 +227,6 @@ impl AlterTableProcedure {
}
fn handle_alter_region_response(&mut self, mut results: Vec<RegionResponse>) -> Result<()> {
self.data.state = AlterTableState::UpdateMetadata;
if let Some(column_metadatas) =
extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)?
{
@@ -235,7 +234,7 @@ impl AlterTableProcedure {
} else {
warn!("altering table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged");
}
self.data.state = AlterTableState::UpdateMetadata;
Ok(())
}

View File

@@ -21,7 +21,7 @@ use common_error::ext::BoxedError;
use common_procedure::error::{
ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{info, warn};
use futures::future::join_all;
@@ -246,8 +246,6 @@ impl CreateTableProcedure {
}
}
self.creator.data.state = CreateTableState::CreateMetadata;
let mut results = join_all(create_region_tasks)
.await
.into_iter()
@@ -261,6 +259,7 @@ impl CreateTableProcedure {
warn!("creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged");
}
self.creator.data.state = CreateTableState::CreateMetadata;
Ok(Status::executing(true))
}
@@ -268,8 +267,9 @@ impl CreateTableProcedure {
///
/// Abort(not-retry):
/// - Failed to create table metadata.
async fn on_create_metadata(&mut self) -> Result<Status> {
async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result<Status> {
let table_id = self.table_id();
let table_ref = self.creator.data.table_ref();
let manager = &self.context.table_metadata_manager;
let mut raw_table_info = self.table_info().clone();
@@ -289,7 +289,10 @@ impl CreateTableProcedure {
self.context
.register_failure_detectors(detecting_regions)
.await;
info!("Created table metadata for table {table_id}");
info!(
"Successfully created table: {}, table_id: {}, procedure_id: {}",
table_ref, table_id, pid
);
self.creator.opening_regions.clear();
Ok(Status::done_with_output(table_id))
@@ -317,7 +320,7 @@ impl Procedure for CreateTableProcedure {
Ok(())
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.creator.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE
@@ -327,7 +330,7 @@ impl Procedure for CreateTableProcedure {
match state {
CreateTableState::Prepare => self.on_prepare().await,
CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
CreateTableState::CreateMetadata => self.on_create_metadata().await,
CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await,
}
.map_err(map_to_procedure_error)
}

View File

@@ -446,6 +446,7 @@ pub fn extract_column_metadatas(
.collect::<Vec<_>>();
if schemas.is_empty() {
warn!("extract_column_metadatas: no extension key `{key}` found in results");
return Ok(None);
}

View File

@@ -16,15 +16,14 @@
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::{info, warn};
use common_telemetry::info;
use object_store::factory::new_raw_object_store;
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::layers::{LruCacheLayer, RetryLayer};
use object_store::services::Fs;
use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers};
use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, PrintDetailedError};
use object_store::{
Access, Error, ObjectStore, ObjectStoreBuilder, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR,
Access, ObjectStore, ObjectStoreBuilder, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR,
};
use snafu::prelude::*;
@@ -176,12 +175,3 @@ async fn build_cache_layer(
Ok(None)
}
}
struct PrintDetailedError;
// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
impl RetryInterceptor for PrintDetailedError {
fn intercept(&self, err: &Error, dur: Duration) {
warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
}
}

View File

@@ -14,11 +14,12 @@
use std::fmt::Display;
use std::path;
use std::time::Duration;
use common_telemetry::{debug, error, info, trace};
use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer};
use common_telemetry::{debug, error, info, trace, warn};
use opendal::layers::{LoggingInterceptor, LoggingLayer, RetryInterceptor, TracingLayer};
use opendal::raw::{AccessorInfo, HttpClient, Operation};
use opendal::ErrorKind;
use opendal::{Error, ErrorKind};
use snafu::ResultExt;
use crate::config::HttpClientConfig;
@@ -229,6 +230,16 @@ pub fn clean_temp_dir(dir: &str) -> error::Result<()> {
Ok(())
}
/// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
pub struct PrintDetailedError;
// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
impl RetryInterceptor for PrintDetailedError {
fn intercept(&self, err: &Error, dur: Duration) {
warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
}
}
#[cfg(test)]
mod tests {
use super::*;