From 1065db95183c6302e5cd3f7409a1cbd322ced442 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 15 Jul 2025 14:06:27 +0800 Subject: [PATCH] fix: fix state transition in create table procedure (#6523) Signed-off-by: WenyXu --- src/common/datasource/src/object_store/oss.rs | 5 +++++ src/common/datasource/src/object_store/s3.rs | 5 +++++ src/common/meta/src/ddl/alter_table.rs | 3 +-- src/common/meta/src/ddl/create_table.rs | 17 ++++++++++------- src/common/meta/src/ddl/utils.rs | 1 + src/datanode/src/store.rs | 18 ++++-------------- src/object-store/src/util.rs | 17 ++++++++++++++--- 7 files changed, 40 insertions(+), 26 deletions(-) diff --git a/src/common/datasource/src/object_store/oss.rs b/src/common/datasource/src/object_store/oss.rs index 0aa583550a..c7e8eb5dd7 100644 --- a/src/common/datasource/src/object_store/oss.rs +++ b/src/common/datasource/src/object_store/oss.rs @@ -77,6 +77,11 @@ pub fn build_oss_backend( let op = ObjectStore::new(builder) .context(error::BuildBackendSnafu)? + .layer( + object_store::layers::RetryLayer::new() + .with_jitter() + .with_notify(object_store::util::PrintDetailedError), + ) .layer(object_store::layers::LoggingLayer::default()) .layer(object_store::layers::TracingLayer) .layer(object_store::layers::build_prometheus_metrics_layer(true)) diff --git a/src/common/datasource/src/object_store/s3.rs b/src/common/datasource/src/object_store/s3.rs index 0d83eb7a98..8f2e6464dd 100644 --- a/src/common/datasource/src/object_store/s3.rs +++ b/src/common/datasource/src/object_store/s3.rs @@ -85,6 +85,11 @@ pub fn build_s3_backend( // TODO(weny): Consider finding a better way to eliminate duplicate code. Ok(ObjectStore::new(builder) .context(error::BuildBackendSnafu)? + .layer( + object_store::layers::RetryLayer::new() + .with_jitter() + .with_notify(object_store::util::PrintDetailedError), + ) .layer(object_store::layers::LoggingLayer::new( DefaultLoggingInterceptor, )) diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 1775bd2882..ac882cf9a9 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -227,7 +227,6 @@ impl AlterTableProcedure { } fn handle_alter_region_response(&mut self, mut results: Vec) -> Result<()> { - self.data.state = AlterTableState::UpdateMetadata; if let Some(column_metadatas) = extract_column_metadatas(&mut results, TABLE_COLUMN_METADATA_EXTENSION_KEY)? { @@ -235,7 +234,7 @@ impl AlterTableProcedure { } else { warn!("altering table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"); } - + self.data.state = AlterTableState::UpdateMetadata; Ok(()) } diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index e1e2649de5..45b7336229 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -21,7 +21,7 @@ use common_error::ext::BoxedError; use common_procedure::error::{ ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, }; -use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +use common_procedure::{Context as ProcedureContext, LockKey, Procedure, ProcedureId, Status}; use common_telemetry::tracing_context::TracingContext; use common_telemetry::{info, warn}; use futures::future::join_all; @@ -246,8 +246,6 @@ impl CreateTableProcedure { } } - self.creator.data.state = CreateTableState::CreateMetadata; - let mut results = join_all(create_region_tasks) .await .into_iter() @@ -261,6 +259,7 @@ impl CreateTableProcedure { warn!("creating table result doesn't contains extension key `{TABLE_COLUMN_METADATA_EXTENSION_KEY}`,leaving the table's column metadata unchanged"); } + self.creator.data.state = CreateTableState::CreateMetadata; Ok(Status::executing(true)) } @@ -268,8 +267,9 @@ impl CreateTableProcedure { /// /// Abort(not-retry): /// - Failed to create table metadata. - async fn on_create_metadata(&mut self) -> Result { + async fn on_create_metadata(&mut self, pid: ProcedureId) -> Result { let table_id = self.table_id(); + let table_ref = self.creator.data.table_ref(); let manager = &self.context.table_metadata_manager; let mut raw_table_info = self.table_info().clone(); @@ -289,7 +289,10 @@ impl CreateTableProcedure { self.context .register_failure_detectors(detecting_regions) .await; - info!("Created table metadata for table {table_id}"); + info!( + "Successfully created table: {}, table_id: {}, procedure_id: {}", + table_ref, table_id, pid + ); self.creator.opening_regions.clear(); Ok(Status::done_with_output(table_id)) @@ -317,7 +320,7 @@ impl Procedure for CreateTableProcedure { Ok(()) } - async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { let state = &self.creator.data.state; let _timer = metrics::METRIC_META_PROCEDURE_CREATE_TABLE @@ -327,7 +330,7 @@ impl Procedure for CreateTableProcedure { match state { CreateTableState::Prepare => self.on_prepare().await, CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await, - CreateTableState::CreateMetadata => self.on_create_metadata().await, + CreateTableState::CreateMetadata => self.on_create_metadata(ctx.procedure_id).await, } .map_err(map_to_procedure_error) } diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index c0e8bd96e0..a5cbfd9b77 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -446,6 +446,7 @@ pub fn extract_column_metadatas( .collect::>(); if schemas.is_empty() { + warn!("extract_column_metadatas: no extension key `{key}` found in results"); return Ok(None); } diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index c6a0fc0b30..dd8f7c3ca2 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -16,15 +16,14 @@ use std::path::Path; use std::sync::Arc; -use std::time::Duration; -use common_telemetry::{info, warn}; +use common_telemetry::info; use object_store::factory::new_raw_object_store; -use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer}; +use object_store::layers::{LruCacheLayer, RetryLayer}; use object_store::services::Fs; -use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers}; +use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, PrintDetailedError}; use object_store::{ - Access, Error, ObjectStore, ObjectStoreBuilder, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR, + Access, ObjectStore, ObjectStoreBuilder, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR, }; use snafu::prelude::*; @@ -176,12 +175,3 @@ async fn build_cache_layer( Ok(None) } } - -struct PrintDetailedError; - -// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying. -impl RetryInterceptor for PrintDetailedError { - fn intercept(&self, err: &Error, dur: Duration) { - warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err); - } -} diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index c4a13d72e9..6e8426a494 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -14,11 +14,12 @@ use std::fmt::Display; use std::path; +use std::time::Duration; -use common_telemetry::{debug, error, info, trace}; -use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer}; +use common_telemetry::{debug, error, info, trace, warn}; +use opendal::layers::{LoggingInterceptor, LoggingLayer, RetryInterceptor, TracingLayer}; use opendal::raw::{AccessorInfo, HttpClient, Operation}; -use opendal::ErrorKind; +use opendal::{Error, ErrorKind}; use snafu::ResultExt; use crate::config::HttpClientConfig; @@ -229,6 +230,16 @@ pub fn clean_temp_dir(dir: &str) -> error::Result<()> { Ok(()) } +/// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying. +pub struct PrintDetailedError; + +// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying. +impl RetryInterceptor for PrintDetailedError { + fn intercept(&self, err: &Error, dur: Duration) { + warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err); + } +} + #[cfg(test)] mod tests { use super::*;