mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-27 18:30:38 +00:00
feat: implement batch region opening in metric engine (#6017)
feat: implement batch open metric regions
This commit is contained in:
@@ -42,11 +42,11 @@ pub(crate) use state::MetricEngineState;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
|
||||
use store_api::region_engine::{
|
||||
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
|
||||
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
|
||||
SyncManifestResponse,
|
||||
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
|
||||
RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
|
||||
SettableRegionRoleState, SyncManifestResponse,
|
||||
};
|
||||
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
|
||||
use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
|
||||
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
@@ -131,6 +131,17 @@ impl RegionEngine for MetricEngine {
|
||||
METRIC_ENGINE_NAME
|
||||
}
|
||||
|
||||
async fn handle_batch_open_requests(
|
||||
&self,
|
||||
parallelism: usize,
|
||||
requests: Vec<(RegionId, RegionOpenRequest)>,
|
||||
) -> Result<BatchResponses, BoxedError> {
|
||||
self.inner
|
||||
.handle_batch_open_requests(parallelism, requests)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
}
|
||||
|
||||
async fn handle_batch_ddl_requests(
|
||||
&self,
|
||||
batch_request: BatchRegionDdlRequest,
|
||||
|
||||
@@ -14,24 +14,80 @@
|
||||
|
||||
//! Open a metric region.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use common_telemetry::info;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use object_store::util::join_dir;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_engine::{BatchResponses, RegionEngine};
|
||||
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::engine::create::region_options_for_metadata_region;
|
||||
use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
|
||||
use crate::engine::MetricEngineInner;
|
||||
use crate::error::{OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result};
|
||||
use crate::error::{
|
||||
BatchOpenMitoRegionSnafu, OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result,
|
||||
};
|
||||
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
|
||||
use crate::utils;
|
||||
|
||||
impl MetricEngineInner {
|
||||
pub async fn handle_batch_open_requests(
|
||||
&self,
|
||||
parallelism: usize,
|
||||
requests: Vec<(RegionId, RegionOpenRequest)>,
|
||||
) -> Result<BatchResponses> {
|
||||
// We need to open metadata region and data region for each request.
|
||||
let mut all_requests = Vec::with_capacity(requests.len() * 2);
|
||||
let mut physical_region_ids = Vec::with_capacity(requests.len());
|
||||
let mut data_region_ids = HashSet::with_capacity(requests.len());
|
||||
|
||||
for (region_id, request) in requests {
|
||||
if !request.is_physical_table() {
|
||||
continue;
|
||||
}
|
||||
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
|
||||
let metadata_region_id = utils::to_metadata_region_id(region_id);
|
||||
let data_region_id = utils::to_data_region_id(region_id);
|
||||
let (open_metadata_region_request, open_data_region_request) =
|
||||
self.transform_open_physical_region_request(request);
|
||||
all_requests.push((metadata_region_id, open_metadata_region_request));
|
||||
all_requests.push((data_region_id, open_data_region_request));
|
||||
physical_region_ids.push((region_id, physical_region_options));
|
||||
data_region_ids.insert(data_region_id);
|
||||
}
|
||||
|
||||
let results = self
|
||||
.mito
|
||||
.handle_batch_open_requests(parallelism, all_requests)
|
||||
.await
|
||||
.context(BatchOpenMitoRegionSnafu {})?
|
||||
.into_iter()
|
||||
.filter(|(region_id, _)| data_region_ids.contains(region_id))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (physical_region_id, physical_region_options) in physical_region_ids {
|
||||
let primary_key_encoding = self
|
||||
.mito
|
||||
.get_primary_key_encoding(physical_region_id)
|
||||
.context(PhysicalRegionNotFoundSnafu {
|
||||
region_id: physical_region_id,
|
||||
})?;
|
||||
self.recover_states(
|
||||
physical_region_id,
|
||||
primary_key_encoding,
|
||||
physical_region_options,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Open a metric region.
|
||||
///
|
||||
/// Only open requests to a physical region matter. Those to logical regions are
|
||||
@@ -69,12 +125,15 @@ impl MetricEngineInner {
|
||||
}
|
||||
}
|
||||
|
||||
/// Invokes mito engine to open physical regions (data and metadata).
|
||||
async fn open_physical_region(
|
||||
/// Transform the open request to open metadata region and data region.
|
||||
///
|
||||
/// Returns:
|
||||
/// - The open request for metadata region.
|
||||
/// - The open request for data region.
|
||||
fn transform_open_physical_region_request(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
request: RegionOpenRequest,
|
||||
) -> Result<AffectedRows> {
|
||||
) -> (RegionOpenRequest, RegionOpenRequest) {
|
||||
let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
|
||||
let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
|
||||
|
||||
@@ -98,8 +157,19 @@ impl MetricEngineInner {
|
||||
skip_wal_replay: request.skip_wal_replay,
|
||||
};
|
||||
|
||||
(open_metadata_region_request, open_data_region_request)
|
||||
}
|
||||
|
||||
/// Invokes mito engine to open physical regions (data and metadata).
|
||||
async fn open_physical_region(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
request: RegionOpenRequest,
|
||||
) -> Result<AffectedRows> {
|
||||
let metadata_region_id = utils::to_metadata_region_id(region_id);
|
||||
let data_region_id = utils::to_data_region_id(region_id);
|
||||
let (open_metadata_region_request, open_data_region_request) =
|
||||
self.transform_open_physical_region_request(request);
|
||||
|
||||
self.mito
|
||||
.handle_request(
|
||||
|
||||
@@ -42,6 +42,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to batch open mito region"))]
|
||||
BatchOpenMitoRegion {
|
||||
source: BoxedError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to close mito region, region id: {}", region_id))]
|
||||
CloseMitoRegion {
|
||||
region_id: RegionId,
|
||||
@@ -337,7 +344,8 @@ impl ErrorExt for Error {
|
||||
| MitoCatchupOperation { source, .. }
|
||||
| MitoFlushOperation { source, .. }
|
||||
| MitoDeleteOperation { source, .. }
|
||||
| MitoSyncOperation { source, .. } => source.status_code(),
|
||||
| MitoSyncOperation { source, .. }
|
||||
| BatchOpenMitoRegion { source, .. } => source.status_code(),
|
||||
|
||||
EncodePrimaryKey { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
Reference in New Issue
Block a user