From 351c741c702dc3f32003bade3275b7558f31034c Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 7 Jul 2025 11:30:40 +0800 Subject: [PATCH] fix(metric-engine): handle stale metadata region recovery failures (#6395) * fix(metric-engine): handle stale metadata region recovery failures Signed-off-by: WenyXu * test: add unit tests Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/metric-engine/src/engine.rs | 89 ++++++++++++++++++++++++++- src/metric-engine/src/engine/close.rs | 2 +- src/metric-engine/src/engine/open.rs | 51 +++++++++++++-- src/metric-engine/src/test_util.rs | 5 ++ 4 files changed, 140 insertions(+), 7 deletions(-) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 827cb90992..161a851d29 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -477,8 +477,9 @@ struct MetricEngineInner { mod test { use std::collections::HashMap; + use common_telemetry::info; use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; - use store_api::region_request::{RegionCloseRequest, RegionOpenRequest}; + use store_api::region_request::{RegionCloseRequest, RegionFlushRequest, RegionOpenRequest}; use super::*; use crate::test_util::TestEnv; @@ -563,4 +564,90 @@ mod test { assert!(env.metric().region_statistic(logical_region_id).is_none()); assert!(env.metric().region_statistic(physical_region_id).is_some()); } + + #[tokio::test] + async fn test_open_region_failure() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let physical_region_id = env.default_physical_region_id(); + + let metric_engine = env.metric(); + metric_engine + .handle_request( + physical_region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + + let path = format!("{}/metadata/", env.default_region_dir()); + let object_store = env.get_object_store().unwrap(); + let list = object_store.list(&path).await.unwrap(); + // Delete parquet files in metadata region + for entry in list { + if entry.metadata().is_dir() { + continue; + } + if entry.name().ends_with("parquet") { + info!("deleting {}", entry.path()); + object_store.delete(entry.path()).await.unwrap(); + } + } + + let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())] + .into_iter() + .collect(); + let open_request = RegionOpenRequest { + engine: METRIC_ENGINE_NAME.to_string(), + region_dir: env.default_region_dir(), + options: physical_region_option, + skip_wal_replay: false, + }; + // Opening an already opened region should succeed. + // Since the region is already open, no metadata recovery operations will be performed. + metric_engine + .handle_request(physical_region_id, RegionRequest::Open(open_request)) + .await + .unwrap(); + + // Close the region + metric_engine + .handle_request( + physical_region_id, + RegionRequest::Close(RegionCloseRequest {}), + ) + .await + .unwrap(); + + // Try to reopen region. + let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())] + .into_iter() + .collect(); + let open_request = RegionOpenRequest { + engine: METRIC_ENGINE_NAME.to_string(), + region_dir: env.default_region_dir(), + options: physical_region_option, + skip_wal_replay: false, + }; + let err = metric_engine + .handle_request(physical_region_id, RegionRequest::Open(open_request)) + .await + .unwrap_err(); + // Failed to open region because of missing parquet files. + assert_eq!(err.status_code(), StatusCode::StorageUnavailable); + + let mito_engine = metric_engine.mito(); + let data_region_id = utils::to_data_region_id(physical_region_id); + let metadata_region_id = utils::to_metadata_region_id(physical_region_id); + // The metadata/data region should be closed. + let err = mito_engine.get_metadata(data_region_id).await.unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionNotFound); + let err = mito_engine + .get_metadata(metadata_region_id) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::RegionNotFound); + } } diff --git a/src/metric-engine/src/engine/close.rs b/src/metric-engine/src/engine/close.rs index 78c45fc453..1af507cf36 100644 --- a/src/metric-engine/src/engine/close.rs +++ b/src/metric-engine/src/engine/close.rs @@ -59,7 +59,7 @@ impl MetricEngineInner { } } - async fn close_physical_region(&self, region_id: RegionId) -> Result { + pub(crate) async fn close_physical_region(&self, region_id: RegionId) -> Result { let data_region_id = utils::to_data_region_id(region_id); let metadata_region_id = utils::to_metadata_region_id(region_id); diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 4f13cb33b1..7237da2947 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -17,7 +17,7 @@ use api::region::RegionResponse; use api::v1::SemanticType; use common_error::ext::BoxedError; -use common_telemetry::info; +use common_telemetry::{error, info, warn}; use datafusion::common::HashMap; use mito2::engine::MITO_ENGINE_NAME; use object_store::util::join_dir; @@ -94,6 +94,21 @@ impl MetricEngineInner { Ok(responses) } + // If the metadata region is opened with a stale manifest, + // the metric engine may fail to recover logical tables from the metadata region, + // as the manifest could reference files that have already been deleted + // due to compaction operations performed by the region leader. + async fn close_physical_region_on_recovery_failure(&self, physical_region_id: RegionId) { + info!( + "Closing metadata region {} and data region {} on metadata recovery failure", + utils::to_metadata_region_id(physical_region_id), + utils::to_data_region_id(physical_region_id) + ); + if let Err(err) = self.close_physical_region(physical_region_id).await { + error!(err; "Failed to close physical region {}", physical_region_id); + } + } + async fn open_physical_region_with_results( &self, metadata_region_result: Option>, @@ -119,8 +134,14 @@ impl MetricEngineInner { region_type: "data", })?; - self.recover_states(physical_region_id, physical_region_options) - .await?; + if let Err(err) = self + .recover_states(physical_region_id, physical_region_options) + .await + { + self.close_physical_region_on_recovery_failure(physical_region_id) + .await; + return Err(err); + } Ok(data_region_response) } @@ -139,11 +160,31 @@ impl MetricEngineInner { request: RegionOpenRequest, ) -> Result { if request.is_physical_table() { + if self + .state + .read() + .unwrap() + .physical_region_states() + .get(®ion_id) + .is_some() + { + warn!( + "The physical region {} is already open, ignore the open request", + region_id + ); + return Ok(0); + } // open physical region and recover states let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?; self.open_physical_region(region_id, request).await?; - self.recover_states(region_id, physical_region_options) - .await?; + if let Err(err) = self + .recover_states(region_id, physical_region_options) + .await + { + self.close_physical_region_on_recovery_failure(region_id) + .await; + return Err(err); + } Ok(0) } else { diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index dc16db4bb4..b8a7130e5a 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -23,6 +23,7 @@ use mito2::config::MitoConfig; use mito2::engine::MitoEngine; use mito2::test_util::TestEnv as MitoTestEnv; use object_store::util::join_dir; +use object_store::ObjectStore; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::{ LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY, @@ -74,6 +75,10 @@ impl TestEnv { join_dir(&env_root, "data") } + pub fn get_object_store(&self) -> Option { + self.mito_env.get_object_store() + } + /// Returns a reference to the engine. pub fn mito(&self) -> MitoEngine { self.mito.clone()