fix(metric-engine): handle stale metadata region recovery failures (#6395)

* fix(metric-engine): handle stale metadata region recovery failures

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-07-07 11:30:40 +08:00
committed by GitHub
parent bb43d604a4
commit 351c741c70
4 changed files with 140 additions and 7 deletions

View File

@@ -477,8 +477,9 @@ struct MetricEngineInner {
mod test {
use std::collections::HashMap;
use common_telemetry::info;
use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest};
use store_api::region_request::{RegionCloseRequest, RegionFlushRequest, RegionOpenRequest};
use super::*;
use crate::test_util::TestEnv;
@@ -563,4 +564,90 @@ mod test {
assert!(env.metric().region_statistic(logical_region_id).is_none());
assert!(env.metric().region_statistic(physical_region_id).is_some());
}
#[tokio::test]
async fn test_open_region_failure() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let physical_region_id = env.default_physical_region_id();
let metric_engine = env.metric();
metric_engine
.handle_request(
physical_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
let path = format!("{}/metadata/", env.default_region_dir());
let object_store = env.get_object_store().unwrap();
let list = object_store.list(&path).await.unwrap();
// Delete parquet files in metadata region
for entry in list {
if entry.metadata().is_dir() {
continue;
}
if entry.name().ends_with("parquet") {
info!("deleting {}", entry.path());
object_store.delete(entry.path()).await.unwrap();
}
}
let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect();
let open_request = RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: env.default_region_dir(),
options: physical_region_option,
skip_wal_replay: false,
};
// Opening an already opened region should succeed.
// Since the region is already open, no metadata recovery operations will be performed.
metric_engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
.await
.unwrap();
// Close the region
metric_engine
.handle_request(
physical_region_id,
RegionRequest::Close(RegionCloseRequest {}),
)
.await
.unwrap();
// Try to reopen region.
let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect();
let open_request = RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: env.default_region_dir(),
options: physical_region_option,
skip_wal_replay: false,
};
let err = metric_engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
.await
.unwrap_err();
// Failed to open region because of missing parquet files.
assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
let mito_engine = metric_engine.mito();
let data_region_id = utils::to_data_region_id(physical_region_id);
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
// The metadata/data region should be closed.
let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RegionNotFound);
let err = mito_engine
.get_metadata(metadata_region_id)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RegionNotFound);
}
}

View File

@@ -59,7 +59,7 @@ impl MetricEngineInner {
}
}
async fn close_physical_region(&self, region_id: RegionId) -> Result<AffectedRows> {
pub(crate) async fn close_physical_region(&self, region_id: RegionId) -> Result<AffectedRows> {
let data_region_id = utils::to_data_region_id(region_id);
let metadata_region_id = utils::to_metadata_region_id(region_id);

View File

@@ -17,7 +17,7 @@
use api::region::RegionResponse;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_telemetry::info;
use common_telemetry::{error, info, warn};
use datafusion::common::HashMap;
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
@@ -94,6 +94,21 @@ impl MetricEngineInner {
Ok(responses)
}
// If the metadata region is opened with a stale manifest,
// the metric engine may fail to recover logical tables from the metadata region,
// as the manifest could reference files that have already been deleted
// due to compaction operations performed by the region leader.
async fn close_physical_region_on_recovery_failure(&self, physical_region_id: RegionId) {
info!(
"Closing metadata region {} and data region {} on metadata recovery failure",
utils::to_metadata_region_id(physical_region_id),
utils::to_data_region_id(physical_region_id)
);
if let Err(err) = self.close_physical_region(physical_region_id).await {
error!(err; "Failed to close physical region {}", physical_region_id);
}
}
async fn open_physical_region_with_results(
&self,
metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
@@ -119,8 +134,14 @@ impl MetricEngineInner {
region_type: "data",
})?;
self.recover_states(physical_region_id, physical_region_options)
.await?;
if let Err(err) = self
.recover_states(physical_region_id, physical_region_options)
.await
{
self.close_physical_region_on_recovery_failure(physical_region_id)
.await;
return Err(err);
}
Ok(data_region_response)
}
@@ -139,11 +160,31 @@ impl MetricEngineInner {
request: RegionOpenRequest,
) -> Result<AffectedRows> {
if request.is_physical_table() {
if self
.state
.read()
.unwrap()
.physical_region_states()
.get(&region_id)
.is_some()
{
warn!(
"The physical region {} is already open, ignore the open request",
region_id
);
return Ok(0);
}
// open physical region and recover states
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
self.open_physical_region(region_id, request).await?;
self.recover_states(region_id, physical_region_options)
.await?;
if let Err(err) = self
.recover_states(region_id, physical_region_options)
.await
{
self.close_physical_region_on_recovery_failure(region_id)
.await;
return Err(err);
}
Ok(0)
} else {

View File

@@ -23,6 +23,7 @@ use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use mito2::test_util::TestEnv as MitoTestEnv;
use object_store::util::join_dir;
use object_store::ObjectStore;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
@@ -74,6 +75,10 @@ impl TestEnv {
join_dir(&env_root, "data")
}
pub fn get_object_store(&self) -> Option<ObjectStore> {
self.mito_env.get_object_store()
}
/// Returns a reference to the engine.
pub fn mito(&self) -> MitoEngine {
self.mito.clone()