diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 15caa2c456..680f4064ca 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -42,11 +42,11 @@ pub(crate) use state::MetricEngineState; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{ - RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, - SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState, - SyncManifestResponse, + BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, + RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, + SettableRegionRoleState, SyncManifestResponse, }; -use store_api::region_request::{BatchRegionDdlRequest, RegionRequest}; +use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; use crate::config::EngineConfig; @@ -131,6 +131,17 @@ impl RegionEngine for MetricEngine { METRIC_ENGINE_NAME } + async fn handle_batch_open_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionOpenRequest)>, + ) -> Result { + self.inner + .handle_batch_open_requests(parallelism, requests) + .await + .map_err(BoxedError::new) + } + async fn handle_batch_ddl_requests( &self, batch_request: BatchRegionDdlRequest, diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 4b25cf38f2..82d861ea25 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -14,24 +14,80 @@ //! Open a metric region. +use std::collections::HashSet; + use common_telemetry::info; use mito2::engine::MITO_ENGINE_NAME; use object_store::util::join_dir; use snafu::{OptionExt, ResultExt}; use store_api::codec::PrimaryKeyEncoding; use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR}; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{BatchResponses, RegionEngine}; use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; use crate::engine::create::region_options_for_metadata_region; use crate::engine::options::{set_data_region_options, PhysicalRegionOptions}; use crate::engine::MetricEngineInner; -use crate::error::{OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result}; +use crate::error::{ + BatchOpenMitoRegionSnafu, OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result, +}; use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT}; use crate::utils; impl MetricEngineInner { + pub async fn handle_batch_open_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionOpenRequest)>, + ) -> Result { + // We need to open metadata region and data region for each request. + let mut all_requests = Vec::with_capacity(requests.len() * 2); + let mut physical_region_ids = Vec::with_capacity(requests.len()); + let mut data_region_ids = HashSet::with_capacity(requests.len()); + + for (region_id, request) in requests { + if !request.is_physical_table() { + continue; + } + let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?; + let metadata_region_id = utils::to_metadata_region_id(region_id); + let data_region_id = utils::to_data_region_id(region_id); + let (open_metadata_region_request, open_data_region_request) = + self.transform_open_physical_region_request(request); + all_requests.push((metadata_region_id, open_metadata_region_request)); + all_requests.push((data_region_id, open_data_region_request)); + physical_region_ids.push((region_id, physical_region_options)); + data_region_ids.insert(data_region_id); + } + + let results = self + .mito + .handle_batch_open_requests(parallelism, all_requests) + .await + .context(BatchOpenMitoRegionSnafu {})? + .into_iter() + .filter(|(region_id, _)| data_region_ids.contains(region_id)) + .collect::>(); + + for (physical_region_id, physical_region_options) in physical_region_ids { + let primary_key_encoding = self + .mito + .get_primary_key_encoding(physical_region_id) + .context(PhysicalRegionNotFoundSnafu { + region_id: physical_region_id, + })?; + self.recover_states( + physical_region_id, + primary_key_encoding, + physical_region_options, + ) + .await?; + } + + Ok(results) + } + /// Open a metric region. /// /// Only open requests to a physical region matter. Those to logical regions are @@ -69,12 +125,15 @@ impl MetricEngineInner { } } - /// Invokes mito engine to open physical regions (data and metadata). - async fn open_physical_region( + /// Transform the open request to open metadata region and data region. + /// + /// Returns: + /// - The open request for metadata region. + /// - The open request for data region. + fn transform_open_physical_region_request( &self, - region_id: RegionId, request: RegionOpenRequest, - ) -> Result { + ) -> (RegionOpenRequest, RegionOpenRequest) { let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR); let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR); @@ -98,8 +157,19 @@ impl MetricEngineInner { skip_wal_replay: request.skip_wal_replay, }; + (open_metadata_region_request, open_data_region_request) + } + + /// Invokes mito engine to open physical regions (data and metadata). + async fn open_physical_region( + &self, + region_id: RegionId, + request: RegionOpenRequest, + ) -> Result { let metadata_region_id = utils::to_metadata_region_id(region_id); let data_region_id = utils::to_data_region_id(region_id); + let (open_metadata_region_request, open_data_region_request) = + self.transform_open_physical_region_request(request); self.mito .handle_request( diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index c0ae55e402..1b390a45ba 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -42,6 +42,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to batch open mito region"))] + BatchOpenMitoRegion { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to close mito region, region id: {}", region_id))] CloseMitoRegion { region_id: RegionId, @@ -337,7 +344,8 @@ impl ErrorExt for Error { | MitoCatchupOperation { source, .. } | MitoFlushOperation { source, .. } | MitoDeleteOperation { source, .. } - | MitoSyncOperation { source, .. } => source.status_code(), + | MitoSyncOperation { source, .. } + | BatchOpenMitoRegion { source, .. } => source.status_code(), EncodePrimaryKey { source, .. } => source.status_code(),