fix(metric-engine): properly propagate errors during batch open operation (#6325)

* fix(metric-engine): properly propagate errors during batch open operation

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-06-19 14:37:54 +08:00
committed by GitHub
parent 6ece560f8c
commit 5231505021
4 changed files with 77 additions and 49 deletions

View File

@@ -79,12 +79,7 @@ impl MetricEngineInner {
.context(MitoCatchupOperationSnafu)
.map(|response| response.affected_rows)?;
let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
},
)?;
self.recover_states(region_id, primary_key_encoding, physical_region_options)
self.recover_states(region_id, physical_region_options)
.await?;
Ok(0)
}

View File

@@ -14,14 +14,14 @@
//! Open a metric region.
use std::collections::HashSet;
use api::region::RegionResponse;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_telemetry::info;
use datafusion::common::HashMap;
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
use snafu::{OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
use store_api::region_engine::{BatchResponses, RegionEngine};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
@@ -31,7 +31,8 @@ use crate::engine::create::region_options_for_metadata_region;
use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
use crate::engine::MetricEngineInner;
use crate::error::{
BatchOpenMitoRegionSnafu, OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result,
BatchOpenMitoRegionSnafu, NoOpenRegionResultSnafu, OpenMitoRegionSnafu,
PhysicalRegionNotFoundSnafu, Result,
};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils;
@@ -44,8 +45,7 @@ impl MetricEngineInner {
) -> Result<BatchResponses> {
// We need to open metadata region and data region for each request.
let mut all_requests = Vec::with_capacity(requests.len() * 2);
let mut physical_region_ids = Vec::with_capacity(requests.len());
let mut data_region_ids = HashSet::with_capacity(requests.len());
let mut physical_region_ids = HashMap::with_capacity(requests.len());
for (region_id, request) in requests {
if !request.is_physical_table() {
@@ -58,35 +58,70 @@ impl MetricEngineInner {
self.transform_open_physical_region_request(request);
all_requests.push((metadata_region_id, open_metadata_region_request));
all_requests.push((data_region_id, open_data_region_request));
physical_region_ids.push((region_id, physical_region_options));
data_region_ids.insert(data_region_id);
physical_region_ids.insert(region_id, physical_region_options);
}
let results = self
let mut results = self
.mito
.handle_batch_open_requests(parallelism, all_requests)
.await
.context(BatchOpenMitoRegionSnafu {})?
.into_iter()
.filter(|(region_id, _)| data_region_ids.contains(region_id))
.collect::<Vec<_>>();
.collect::<HashMap<_, _>>();
let mut responses = Vec::with_capacity(physical_region_ids.len());
for (physical_region_id, physical_region_options) in physical_region_ids {
let primary_key_encoding = self
.mito
.get_primary_key_encoding(physical_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?;
self.recover_states(
physical_region_id,
primary_key_encoding,
physical_region_options,
)
.await?;
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let data_region_id = utils::to_data_region_id(physical_region_id);
let metadata_region_result = results.remove(&metadata_region_id);
let data_region_result = results.remove(&data_region_id);
// Pass the optional `metadata_region_result` and `data_region_result` to
// `open_physical_region_with_results`. This function handles errors for each
// open physical region request, allowing the process to continue with the
// remaining regions even if some requests fail.
let response = self
.open_physical_region_with_results(
metadata_region_result,
data_region_result,
physical_region_id,
physical_region_options,
)
.await
.map_err(BoxedError::new);
responses.push((physical_region_id, response));
}
Ok(results)
Ok(responses)
}
async fn open_physical_region_with_results(
&self,
metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
data_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
physical_region_id: RegionId,
physical_region_options: PhysicalRegionOptions,
) -> Result<RegionResponse> {
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let data_region_id = utils::to_data_region_id(physical_region_id);
let _ = metadata_region_result
.context(NoOpenRegionResultSnafu {
region_id: metadata_region_id,
})?
.context(OpenMitoRegionSnafu {
region_type: "metadata",
})?;
let data_region_response = data_region_result
.context(NoOpenRegionResultSnafu {
region_id: data_region_id,
})?
.context(OpenMitoRegionSnafu {
region_type: "data",
})?;
self.recover_states(physical_region_id, physical_region_options)
.await?;
Ok(data_region_response)
}
/// Open a metric region.
@@ -107,13 +142,7 @@ impl MetricEngineInner {
// open physical region and recover states
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
self.open_physical_region(region_id, request).await?;
let data_region_id = utils::to_data_region_id(region_id);
let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
},
)?;
self.recover_states(region_id, primary_key_encoding, physical_region_options)
self.recover_states(region_id, physical_region_options)
.await?;
Ok(0)
@@ -208,7 +237,6 @@ impl MetricEngineInner {
pub(crate) async fn recover_states(
&self,
physical_region_id: RegionId,
primary_key_encoding: PrimaryKeyEncoding,
physical_region_options: PhysicalRegionOptions,
) -> Result<Vec<RegionId>> {
// load logical regions and physical column names
@@ -220,6 +248,12 @@ impl MetricEngineInner {
.data_region
.physical_columns(physical_region_id)
.await?;
let primary_key_encoding = self
.mito
.get_primary_key_encoding(physical_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?;
{
let mut state = self.state.write().unwrap();

View File

@@ -87,17 +87,8 @@ impl MetricEngineInner {
region_id: data_region_id,
})?
.options();
let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
},
)?;
let new_opened_logical_region_ids = self
.recover_states(
data_region_id,
primary_key_encoding,
physical_region_options,
)
.recover_states(data_region_id, physical_region_options)
.await?;
info!(
"Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}",

View File

@@ -49,6 +49,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("No open region result for region {}", region_id))]
NoOpenRegionResult {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to close mito region, region id: {}", region_id))]
CloseMitoRegion {
region_id: RegionId,
@@ -328,7 +335,8 @@ impl ErrorExt for Error {
| ParseRegionId { .. }
| InvalidMetadata { .. }
| SetSkippingIndexOption { .. }
| SerializeRegionManifestInfo { .. } => StatusCode::Unexpected,
| SerializeRegionManifestInfo { .. }
| NoOpenRegionResult { .. } => StatusCode::Unexpected,
PhysicalRegionNotFound { .. } | LogicalRegionNotFound { .. } => {
StatusCode::RegionNotFound