diff --git a/src/metric-engine/src/engine/catchup.rs b/src/metric-engine/src/engine/catchup.rs index d2e92f6e0e..b7e02c4edc 100644 --- a/src/metric-engine/src/engine/catchup.rs +++ b/src/metric-engine/src/engine/catchup.rs @@ -79,12 +79,7 @@ impl MetricEngineInner { .context(MitoCatchupOperationSnafu) .map(|response| response.affected_rows)?; - let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context( - PhysicalRegionNotFoundSnafu { - region_id: data_region_id, - }, - )?; - self.recover_states(region_id, primary_key_encoding, physical_region_options) + self.recover_states(region_id, physical_region_options) .await?; Ok(0) } diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 721cefee28..a1aeb4def5 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -14,14 +14,14 @@ //! Open a metric region. -use std::collections::HashSet; - +use api::region::RegionResponse; use api::v1::SemanticType; +use common_error::ext::BoxedError; use common_telemetry::info; +use datafusion::common::HashMap; use mito2::engine::MITO_ENGINE_NAME; use object_store::util::join_dir; use snafu::{OptionExt, ResultExt}; -use store_api::codec::PrimaryKeyEncoding; use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR}; use store_api::region_engine::{BatchResponses, RegionEngine}; use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; @@ -31,7 +31,8 @@ use crate::engine::create::region_options_for_metadata_region; use crate::engine::options::{set_data_region_options, PhysicalRegionOptions}; use crate::engine::MetricEngineInner; use crate::error::{ - BatchOpenMitoRegionSnafu, OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result, + BatchOpenMitoRegionSnafu, NoOpenRegionResultSnafu, OpenMitoRegionSnafu, + PhysicalRegionNotFoundSnafu, Result, }; use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT}; use crate::utils; @@ -44,8 +45,7 @@ impl MetricEngineInner { ) -> Result { // We need to open metadata region and data region for each request. let mut all_requests = Vec::with_capacity(requests.len() * 2); - let mut physical_region_ids = Vec::with_capacity(requests.len()); - let mut data_region_ids = HashSet::with_capacity(requests.len()); + let mut physical_region_ids = HashMap::with_capacity(requests.len()); for (region_id, request) in requests { if !request.is_physical_table() { @@ -58,35 +58,70 @@ impl MetricEngineInner { self.transform_open_physical_region_request(request); all_requests.push((metadata_region_id, open_metadata_region_request)); all_requests.push((data_region_id, open_data_region_request)); - physical_region_ids.push((region_id, physical_region_options)); - data_region_ids.insert(data_region_id); + physical_region_ids.insert(region_id, physical_region_options); } - let results = self + let mut results = self .mito .handle_batch_open_requests(parallelism, all_requests) .await .context(BatchOpenMitoRegionSnafu {})? .into_iter() - .filter(|(region_id, _)| data_region_ids.contains(region_id)) - .collect::>(); + .collect::>(); + let mut responses = Vec::with_capacity(physical_region_ids.len()); for (physical_region_id, physical_region_options) in physical_region_ids { - let primary_key_encoding = self - .mito - .get_primary_key_encoding(physical_region_id) - .context(PhysicalRegionNotFoundSnafu { - region_id: physical_region_id, - })?; - self.recover_states( - physical_region_id, - primary_key_encoding, - physical_region_options, - ) - .await?; + let metadata_region_id = utils::to_metadata_region_id(physical_region_id); + let data_region_id = utils::to_data_region_id(physical_region_id); + let metadata_region_result = results.remove(&metadata_region_id); + let data_region_result = results.remove(&data_region_id); + // Pass the optional `metadata_region_result` and `data_region_result` to + // `open_physical_region_with_results`. This function handles errors for each + // open physical region request, allowing the process to continue with the + // remaining regions even if some requests fail. + let response = self + .open_physical_region_with_results( + metadata_region_result, + data_region_result, + physical_region_id, + physical_region_options, + ) + .await + .map_err(BoxedError::new); + responses.push((physical_region_id, response)); } - Ok(results) + Ok(responses) + } + + async fn open_physical_region_with_results( + &self, + metadata_region_result: Option>, + data_region_result: Option>, + physical_region_id: RegionId, + physical_region_options: PhysicalRegionOptions, + ) -> Result { + let metadata_region_id = utils::to_metadata_region_id(physical_region_id); + let data_region_id = utils::to_data_region_id(physical_region_id); + let _ = metadata_region_result + .context(NoOpenRegionResultSnafu { + region_id: metadata_region_id, + })? + .context(OpenMitoRegionSnafu { + region_type: "metadata", + })?; + + let data_region_response = data_region_result + .context(NoOpenRegionResultSnafu { + region_id: data_region_id, + })? + .context(OpenMitoRegionSnafu { + region_type: "data", + })?; + + self.recover_states(physical_region_id, physical_region_options) + .await?; + Ok(data_region_response) } /// Open a metric region. @@ -107,13 +142,7 @@ impl MetricEngineInner { // open physical region and recover states let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?; self.open_physical_region(region_id, request).await?; - let data_region_id = utils::to_data_region_id(region_id); - let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context( - PhysicalRegionNotFoundSnafu { - region_id: data_region_id, - }, - )?; - self.recover_states(region_id, primary_key_encoding, physical_region_options) + self.recover_states(region_id, physical_region_options) .await?; Ok(0) @@ -208,7 +237,6 @@ impl MetricEngineInner { pub(crate) async fn recover_states( &self, physical_region_id: RegionId, - primary_key_encoding: PrimaryKeyEncoding, physical_region_options: PhysicalRegionOptions, ) -> Result> { // load logical regions and physical column names @@ -220,6 +248,12 @@ impl MetricEngineInner { .data_region .physical_columns(physical_region_id) .await?; + let primary_key_encoding = self + .mito + .get_primary_key_encoding(physical_region_id) + .context(PhysicalRegionNotFoundSnafu { + region_id: physical_region_id, + })?; { let mut state = self.state.write().unwrap(); diff --git a/src/metric-engine/src/engine/sync.rs b/src/metric-engine/src/engine/sync.rs index fe0d8ef6d0..a0ebf07ab8 100644 --- a/src/metric-engine/src/engine/sync.rs +++ b/src/metric-engine/src/engine/sync.rs @@ -87,17 +87,8 @@ impl MetricEngineInner { region_id: data_region_id, })? .options(); - let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context( - PhysicalRegionNotFoundSnafu { - region_id: data_region_id, - }, - )?; let new_opened_logical_region_ids = self - .recover_states( - data_region_id, - primary_key_encoding, - physical_region_options, - ) + .recover_states(data_region_id, physical_region_options) .await?; info!( "Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}", diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 0ffd3c8bc6..d4fcb4e5b2 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -49,6 +49,13 @@ pub enum Error { location: Location, }, + #[snafu(display("No open region result for region {}", region_id))] + NoOpenRegionResult { + region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to close mito region, region id: {}", region_id))] CloseMitoRegion { region_id: RegionId, @@ -328,7 +335,8 @@ impl ErrorExt for Error { | ParseRegionId { .. } | InvalidMetadata { .. } | SetSkippingIndexOption { .. } - | SerializeRegionManifestInfo { .. } => StatusCode::Unexpected, + | SerializeRegionManifestInfo { .. } + | NoOpenRegionResult { .. } => StatusCode::Unexpected, PhysicalRegionNotFound { .. } | LogicalRegionNotFound { .. } => { StatusCode::RegionNotFound