feat(metric-engine): support sync logical regions from source region (#7438)

* chore: move file

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(metric-engine): support sync logical regions from source region

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-12-25 17:06:58 +08:00
committed by GitHub
parent be530ac1de
commit 294f19fa1d
18 changed files with 975 additions and 424 deletions

1
Cargo.lock generated
View File

@@ -7622,6 +7622,7 @@ dependencies = [
"async-trait",
"base64 0.22.1",
"bytes",
"chrono",
"common-base",
"common-error",
"common-macro",

View File

@@ -66,7 +66,7 @@ use store_api::metric_engine_consts::{
};
use store_api::region_engine::{
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState,
SettableRegionRoleState, SyncRegionFromRequest,
};
use store_api::region_request::{
AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
@@ -536,10 +536,13 @@ impl RegionServer {
let tracing_context = TracingContext::from_current_span();
let span = tracing_context.attach(info_span!("RegionServer::handle_sync_region_request"));
self.sync_region(region_id, manifest_info)
.trace(span)
.await
.map(|_| RegionResponse::new(AffectedRows::default()))
self.sync_region(
region_id,
SyncRegionFromRequest::from_manifest(manifest_info),
)
.trace(span)
.await
.map(|_| RegionResponse::new(AffectedRows::default()))
}
/// Handles the ListMetadata request and retrieves metadata for specified regions.
@@ -588,7 +591,7 @@ impl RegionServer {
pub async fn sync_region(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
request: SyncRegionFromRequest,
) -> Result<()> {
let engine_with_status = self
.inner
@@ -597,7 +600,7 @@ impl RegionServer {
.with_context(|| RegionNotFoundSnafu { region_id })?;
self.inner
.handle_sync_region(engine_with_status.engine(), region_id, manifest_info)
.handle_sync_region(engine_with_status.engine(), region_id, request)
.await
}
@@ -1269,10 +1272,10 @@ impl RegionServerInner {
&self,
engine: &RegionEngineRef,
region_id: RegionId,
manifest_info: RegionManifestInfo,
request: SyncRegionFromRequest,
) -> Result<()> {
let Some(new_opened_regions) = engine
.sync_region(region_id, manifest_info)
.sync_region(region_id, request)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })?
.new_opened_logical_region_ids()

View File

@@ -33,9 +33,9 @@ use servers::grpc::FlightCompression;
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole,
RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse,
SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
SyncRegionFromRequest, SyncRegionFromResponse,
};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
@@ -287,8 +287,8 @@ impl RegionEngine for MockRegionEngine {
async fn sync_region(
&self,
_region_id: RegionId,
_manifest_info: RegionManifestInfo,
) -> Result<SyncManifestResponse, BoxedError> {
_request: SyncRegionFromRequest,
) -> Result<SyncRegionFromResponse, BoxedError> {
unimplemented!()
}
@@ -299,14 +299,6 @@ impl RegionEngine for MockRegionEngine {
unimplemented!()
}
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -26,10 +26,9 @@ use object_store::ObjectStore;
use snafu::{OptionExt, ensure};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole,
RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse,
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
SinglePartitionScanner, SyncManifestResponse,
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
RemapManifestsResponse, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
SettableRegionRoleState, SinglePartitionScanner, SyncRegionFromRequest, SyncRegionFromResponse,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
@@ -145,10 +144,10 @@ impl RegionEngine for FileRegionEngine {
async fn sync_region(
&self,
_region_id: RegionId,
_manifest_info: RegionManifestInfo,
) -> Result<SyncManifestResponse, BoxedError> {
_request: SyncRegionFromRequest,
) -> Result<SyncRegionFromResponse, BoxedError> {
// File engine doesn't need to sync region manifest.
Ok(SyncManifestResponse::NotSupported)
Ok(SyncRegionFromResponse::NotSupported)
}
async fn remap_manifests(
@@ -163,19 +162,6 @@ impl RegionEngine for FileRegionEngine {
))
}
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
Err(BoxedError::new(
UnsupportedSnafu {
operation: "copy_region_from",
}
.build(),
))
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.state(region_id)
}

View File

@@ -23,6 +23,7 @@ common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
chrono.workspace = true
datafusion.workspace = true
datatypes.workspace = true
futures-util.workspace = true

View File

@@ -43,10 +43,10 @@ pub(crate) use state::MetricEngineState;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
BatchResponses, CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine,
RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
RemapManifestsResponse, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
SettableRegionRoleState, SyncManifestResponse,
BatchResponses, RegionEngine, RegionRole, RegionScannerRef, RegionStatistic,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SetRegionRoleStateSuccess, SettableRegionRoleState, SyncRegionFromRequest,
SyncRegionFromResponse,
};
use store_api::region_request::{
BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
@@ -361,12 +361,30 @@ impl RegionEngine for MetricEngine {
async fn sync_region(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<SyncManifestResponse, BoxedError> {
self.inner
.sync_region(region_id, manifest_info)
.await
.map_err(BoxedError::new)
request: SyncRegionFromRequest,
) -> Result<SyncRegionFromResponse, BoxedError> {
match request {
SyncRegionFromRequest::FromManifest(manifest_info) => self
.inner
.sync_region_from_manifest(region_id, manifest_info)
.await
.map_err(BoxedError::new),
SyncRegionFromRequest::FromRegion {
source_region_id,
parallelism,
} => {
if self.inner.is_physical_region(region_id) {
self.inner
.sync_region_from_region(region_id, source_region_id, parallelism)
.await
.map_err(BoxedError::new)
} else {
Err(BoxedError::new(
error::UnsupportedSyncRegionFromRequestSnafu { region_id }.build(),
))
}
}
}
}
async fn remap_manifests(
@@ -383,14 +401,6 @@ impl RegionEngine for MetricEngine {
}
}
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
todo!()
}
async fn set_region_role_state_gracefully(
&self,
region_id: RegionId,

View File

@@ -290,6 +290,11 @@ impl MetricEngineInner {
.metadata_region
.logical_regions(physical_region_id)
.await?;
common_telemetry::debug!(
"Recover states for physical region {}, logical regions: {:?}",
physical_region_id,
logical_regions
);
let physical_columns = self
.data_region
.physical_columns(physical_region_id)

View File

@@ -12,242 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Instant;
use common_telemetry::info;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncManifestResponse};
use store_api::storage::RegionId;
use crate::engine::MetricEngineInner;
use crate::error::{
MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result,
};
use crate::utils;
impl MetricEngineInner {
pub async fn sync_region(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<SyncManifestResponse> {
ensure!(
manifest_info.is_metric(),
MetricManifestInfoSnafu { region_id }
);
let metadata_region_id = utils::to_metadata_region_id(region_id);
// checked by ensure above
let metadata_manifest_version = manifest_info
.metadata_manifest_version()
.unwrap_or_default();
let metadata_flushed_entry_id = manifest_info
.metadata_flushed_entry_id()
.unwrap_or_default();
let metadata_region_manifest =
RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id, 0);
let metadata_synced = self
.mito
.sync_region(metadata_region_id, metadata_region_manifest)
.await
.context(MitoSyncOperationSnafu)?
.is_data_synced();
let data_region_id = utils::to_data_region_id(region_id);
let data_manifest_version = manifest_info.data_manifest_version();
let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
let data_region_manifest =
RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id, 0);
let data_synced = self
.mito
.sync_region(data_region_id, data_region_manifest)
.await
.context(MitoSyncOperationSnafu)?
.is_data_synced();
if !metadata_synced {
return Ok(SyncManifestResponse::Metric {
metadata_synced,
data_synced,
new_opened_logical_region_ids: vec![],
});
}
let now = Instant::now();
// Recovers the states from the metadata region
// if the metadata manifest version is updated.
let physical_region_options = *self
.state
.read()
.unwrap()
.physical_region_states()
.get(&data_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?
.options();
let new_opened_logical_region_ids = self
.recover_states(data_region_id, physical_region_options)
.await?;
info!(
"Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}",
data_region_id,
now.elapsed(),
new_opened_logical_region_ids
);
Ok(SyncManifestResponse::Metric {
metadata_synced,
data_synced,
new_opened_logical_region_ids,
})
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use api::v1::SemanticType;
use common_query::prelude::greptime_timestamp;
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::{RegionEngine, RegionManifestInfo};
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest,
};
use store_api::storage::RegionId;
use crate::metadata_region::MetadataRegion;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_sync_region_with_new_created_logical_regions() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await;
env.init_metric_region().await;
info!("creating follower engine");
// Create a follower engine.
let (_follower_mito, follower_metric) = env.create_follower_engine().await;
let physical_region_id = env.default_physical_region_id();
// Flushes the physical region
let metric_engine = env.metric();
metric_engine
.handle_request(
env.default_physical_region_id(),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
let response = follower_metric
.sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
.await
.unwrap();
assert!(response.is_metric());
let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]);
// Sync again, no new logical region should be opened
let response = follower_metric
.sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
.await
.unwrap();
assert!(response.is_metric());
let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
assert!(new_opened_logical_region_ids.is_empty());
}
fn test_alter_logical_region_request() -> RegionAlterRequest {
RegionAlterRequest {
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag1",
ConcreteDataType::string_datatype(),
false,
),
},
location: None,
}],
},
}
}
#[tokio::test]
async fn test_sync_region_alter_alter_logical_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await;
env.init_metric_region().await;
info!("creating follower engine");
let physical_region_id = env.default_physical_region_id();
// Flushes the physical region
let metric_engine = env.metric();
metric_engine
.handle_request(
env.default_physical_region_id(),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
// Create a follower engine.
let (follower_mito, follower_metric) = env.create_follower_engine().await;
let metric_engine = env.metric();
let engine_inner = env.metric().inner;
let region_id = env.default_logical_region_id();
let request = test_alter_logical_region_request();
engine_inner
.alter_logical_regions(
physical_region_id,
vec![(region_id, request)],
&mut HashMap::new(),
)
.await
.unwrap();
// Flushes the physical region
metric_engine
.handle_request(
env.default_physical_region_id(),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
// Sync the follower engine
let response = follower_metric
.sync_region(physical_region_id, RegionManifestInfo::metric(2, 0, 2, 0))
.await
.unwrap();
assert!(response.is_metric());
let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
assert!(new_opened_logical_region_ids.is_empty());
let logical_region_id = env.default_logical_region_id();
let metadata_region = MetadataRegion::new(follower_mito.clone());
let semantic_type = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "tag1")
.await
.unwrap()
.unwrap();
assert_eq!(semantic_type, SemanticType::Tag);
let timestamp_index = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp())
.await
.unwrap()
.unwrap();
assert_eq!(timestamp_index, SemanticType::Timestamp);
}
}
mod manifest;
mod region;

View File

@@ -0,0 +1,268 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Instant;
use common_telemetry::info;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncRegionFromResponse};
use store_api::storage::RegionId;
use crate::engine::MetricEngineInner;
use crate::error::{
MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result,
};
use crate::utils;
impl MetricEngineInner {
/// Syncs the region from the given manifest information (leader-follower scenario).
///
/// This operation:
/// 1. Syncs the metadata region manifest to the target version.
/// 2. Syncs the data region manifest to the target version.
/// 3. Recovers states and returns newly opened logical regions (if metadata was synced)
pub async fn sync_region_from_manifest(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<SyncRegionFromResponse> {
ensure!(
manifest_info.is_metric(),
MetricManifestInfoSnafu { region_id }
);
let metadata_region_id = utils::to_metadata_region_id(region_id);
// checked by ensure above
let metadata_manifest_version = manifest_info
.metadata_manifest_version()
.unwrap_or_default();
let metadata_flushed_entry_id = manifest_info
.metadata_flushed_entry_id()
.unwrap_or_default();
let metadata_region_manifest =
RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id, 0);
let metadata_synced = self
.mito
.sync_region(metadata_region_id, metadata_region_manifest.into())
.await
.context(MitoSyncOperationSnafu)?
.is_data_synced();
let data_region_id = utils::to_data_region_id(region_id);
let data_manifest_version = manifest_info.data_manifest_version();
let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
let data_region_manifest =
RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id, 0);
let data_synced = self
.mito
.sync_region(data_region_id, data_region_manifest.into())
.await
.context(MitoSyncOperationSnafu)?
.is_data_synced();
if !metadata_synced {
return Ok(SyncRegionFromResponse::Metric {
metadata_synced,
data_synced,
new_opened_logical_region_ids: vec![],
});
}
let now = Instant::now();
// Recovers the states from the metadata region
// if the metadata manifest version is updated.
let physical_region_options = *self
.state
.read()
.unwrap()
.physical_region_states()
.get(&data_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?
.options();
let new_opened_logical_region_ids = self
.recover_states(data_region_id, physical_region_options)
.await?;
info!(
"Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}",
data_region_id,
now.elapsed(),
new_opened_logical_region_ids
);
Ok(SyncRegionFromResponse::Metric {
metadata_synced,
data_synced,
new_opened_logical_region_ids,
})
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use api::v1::SemanticType;
use common_query::prelude::greptime_timestamp;
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::{RegionEngine, RegionManifestInfo};
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest,
};
use store_api::storage::RegionId;
use crate::metadata_region::MetadataRegion;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_sync_region_with_new_created_logical_regions() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await;
env.init_metric_region().await;
info!("creating follower engine");
// Create a follower engine.
let (_follower_mito, follower_metric) = env.create_follower_engine().await;
let physical_region_id = env.default_physical_region_id();
// Flushes the physical region
let metric_engine = env.metric();
metric_engine
.handle_request(
env.default_physical_region_id(),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
let response = follower_metric
.sync_region(
physical_region_id,
RegionManifestInfo::metric(1, 0, 1, 0).into(),
)
.await
.unwrap();
assert!(response.is_metric());
let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]);
// Sync again, no new logical region should be opened
let response = follower_metric
.sync_region(
physical_region_id,
RegionManifestInfo::metric(1, 0, 1, 0).into(),
)
.await
.unwrap();
assert!(response.is_metric());
let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
assert!(new_opened_logical_region_ids.is_empty());
}
fn test_alter_logical_region_request() -> RegionAlterRequest {
RegionAlterRequest {
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag1",
ConcreteDataType::string_datatype(),
false,
),
},
location: None,
}],
},
}
}
#[tokio::test]
async fn test_sync_region_alter_alter_logical_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await;
env.init_metric_region().await;
info!("creating follower engine");
let physical_region_id = env.default_physical_region_id();
// Flushes the physical region
let metric_engine = env.metric();
metric_engine
.handle_request(
env.default_physical_region_id(),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
// Create a follower engine.
let (follower_mito, follower_metric) = env.create_follower_engine().await;
let metric_engine = env.metric();
let engine_inner = env.metric().inner;
let region_id = env.default_logical_region_id();
let request = test_alter_logical_region_request();
engine_inner
.alter_logical_regions(
physical_region_id,
vec![(region_id, request)],
&mut HashMap::new(),
)
.await
.unwrap();
// Flushes the physical region
metric_engine
.handle_request(
env.default_physical_region_id(),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
// Sync the follower engine
let response = follower_metric
.sync_region(
physical_region_id,
RegionManifestInfo::metric(2, 0, 2, 0).into(),
)
.await
.unwrap();
assert!(response.is_metric());
let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
assert!(new_opened_logical_region_ids.is_empty());
let logical_region_id = env.default_logical_region_id();
let metadata_region = MetadataRegion::new(follower_mito.clone());
let semantic_type = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "tag1")
.await
.unwrap()
.unwrap();
assert_eq!(semantic_type, SemanticType::Tag);
let timestamp_index = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp())
.await
.unwrap()
.unwrap();
assert_eq!(timestamp_index, SemanticType::Timestamp);
}
}

View File

@@ -0,0 +1,386 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Instant;
use common_error::ext::BoxedError;
use common_telemetry::info;
use mito2::manifest::action::RegionEdit;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::region_engine::{MitoCopyRegionFromRequest, SyncRegionFromResponse};
use store_api::storage::RegionId;
use crate::engine::MetricEngineInner;
use crate::error::{
MissingFilesSnafu, MitoCopyRegionFromOperationSnafu, MitoEditRegionSnafu,
PhysicalRegionNotFoundSnafu, Result,
};
use crate::utils;
impl MetricEngineInner {
/// Syncs the logical regions from the source region to the target region in the metric engine.
///
/// This operation:
/// 1. Copies SST files from source metadata region to target metadata region
/// 2. Transforms logical region metadata (updates region numbers to match target)
/// 3. Edits target manifest to remove old file entries (copied files)
/// 4. Recovers states and returns newly opened logical region IDs
///
/// **Note**: Only the metadata region is synced. The data region is not affected.
pub(crate) async fn sync_region_from_region(
&self,
region_id: RegionId,
source_region_id: RegionId,
parallelism: usize,
) -> Result<SyncRegionFromResponse> {
let source_metadata_region_id = utils::to_metadata_region_id(source_region_id);
let target_metadata_region_id = utils::to_metadata_region_id(region_id);
let target_data_region_id = utils::to_data_region_id(region_id);
let source_data_region_id = utils::to_data_region_id(source_region_id);
info!(
"Syncing region from region {} to region {}, parallelism: {}",
source_region_id, region_id, parallelism
);
let res = self
.mito
.copy_region_from(
target_metadata_region_id,
MitoCopyRegionFromRequest {
source_region_id: source_metadata_region_id,
parallelism,
},
)
.await
.map_err(BoxedError::new)
.context(MitoCopyRegionFromOperationSnafu {
source_region_id: source_metadata_region_id,
target_region_id: target_metadata_region_id,
})?;
if res.copied_file_ids.is_empty() {
info!(
"No files were copied from source region {} to target region {}, copied file ids are empty",
source_metadata_region_id, target_metadata_region_id
);
return Ok(SyncRegionFromResponse::Metric {
metadata_synced: false,
data_synced: false,
new_opened_logical_region_ids: vec![],
});
}
let target_region = self.mito.find_region(target_metadata_region_id).context(
PhysicalRegionNotFoundSnafu {
region_id: target_metadata_region_id,
},
)?;
let files_to_remove = target_region.file_metas(&res.copied_file_ids).await;
let missing_file_ids = res
.copied_file_ids
.iter()
.zip(&files_to_remove)
.filter_map(|(file_id, maybe_meta)| {
if maybe_meta.is_none() {
Some(*file_id)
} else {
None
}
})
.collect::<Vec<_>>();
// `copy_region_from` does not trigger compaction,
// so there should be no files removed and thus no missing files.
ensure!(
missing_file_ids.is_empty(),
MissingFilesSnafu {
region_id: target_metadata_region_id,
file_ids: missing_file_ids,
}
);
let files_to_remove = files_to_remove.into_iter().flatten().collect::<Vec<_>>();
// Transform the logical region metadata of the target data region.
self.metadata_region
.transform_logical_region_metadata(target_data_region_id, source_data_region_id)
.await?;
let edit = RegionEdit {
files_to_add: vec![],
files_to_remove: files_to_remove.clone(),
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
committed_sequence: None,
};
self.mito
.edit_region(target_metadata_region_id, edit)
.await
.map_err(BoxedError::new)
.context(MitoEditRegionSnafu {
region_id: target_metadata_region_id,
})?;
info!(
"Successfully edit metadata region: {} after syncing from source metadata region: {}, files to remove: {:?}",
target_metadata_region_id,
source_metadata_region_id,
files_to_remove
.iter()
.map(|meta| meta.file_id)
.collect::<Vec<_>>(),
);
let now = Instant::now();
// Always recover states from the target metadata region after syncing
// from the source metadata region.
let physical_region_options = *self
.state
.read()
.unwrap()
.physical_region_states()
.get(&target_data_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: target_data_region_id,
})?
.options();
let new_opened_logical_region_ids = self
.recover_states(target_data_region_id, physical_region_options)
.await?;
info!(
"Sync metadata region from source region {} to target region {}, recover states cost: {:?}, new opened logical region ids: {:?}",
source_metadata_region_id,
target_metadata_region_id,
now.elapsed(),
new_opened_logical_region_ids
);
Ok(SyncRegionFromResponse::Metric {
metadata_synced: true,
data_synced: false,
new_opened_logical_region_ids,
})
}
}
#[cfg(test)]
mod tests {
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::debug;
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
use store_api::region_engine::{RegionEngine, SyncRegionFromRequest};
use store_api::region_request::{
BatchRegionDdlRequest, PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest,
RegionRequest,
};
use store_api::storage::RegionId;
use crate::metadata_region::MetadataRegion;
use crate::test_util::{TestEnv, create_logical_region_request};
async fn assert_logical_table_columns(
metadata_region: &MetadataRegion,
physical_region_id: RegionId,
logical_region_id: RegionId,
expected_columns: &[&str],
) {
let mut columns = metadata_region
.logical_columns(physical_region_id, logical_region_id)
.await
.unwrap()
.into_iter()
.map(|(n, _)| n)
.collect::<Vec<_>>();
columns.sort_unstable();
assert_eq!(columns, expected_columns);
}
#[tokio::test]
async fn test_sync_region_from_region() {
common_telemetry::init_default_ut_logging();
let env = TestEnv::new().await;
let metric_engine = env.metric();
let source_physical_region_id = RegionId::new(1024, 0);
let logical_region_id1 = RegionId::new(1025, 0);
let logical_region_id2 = RegionId::new(1026, 0);
env.create_physical_region(source_physical_region_id, "/test_dir1", vec![])
.await;
let region_create_request1 =
create_logical_region_request(&["job"], source_physical_region_id, "logical1");
let region_create_request2 =
create_logical_region_request(&["host"], source_physical_region_id, "logical2");
metric_engine
.handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
(logical_region_id1, region_create_request1),
(logical_region_id2, region_create_request2),
]))
.await
.unwrap();
debug!("Flushing source physical region");
metric_engine
.handle_request(
source_physical_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
let logical_regions = metric_engine
.logical_regions(source_physical_region_id)
.await
.unwrap();
assert!(logical_regions.contains(&logical_region_id1));
assert!(logical_regions.contains(&logical_region_id2));
let target_physical_region_id = RegionId::new(1024, 1);
let target_logical_region_id1 = RegionId::new(1025, 1);
let target_logical_region_id2 = RegionId::new(1026, 1);
// Prepare target physical region
env.create_physical_region(target_physical_region_id, "/test_dir1", vec![])
.await;
let r = metric_engine
.sync_region(
target_physical_region_id,
SyncRegionFromRequest::FromRegion {
source_region_id: source_physical_region_id,
parallelism: 1,
},
)
.await
.unwrap();
let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap();
assert_eq!(new_opened_logical_region_ids.len(), 2);
assert!(new_opened_logical_region_ids.contains(&target_logical_region_id1));
assert!(new_opened_logical_region_ids.contains(&target_logical_region_id2));
debug!("Sync region from again");
assert_logical_table_columns(
&env.metadata_region(),
target_physical_region_id,
target_logical_region_id1,
&["greptime_timestamp", "greptime_value", "job"],
)
.await;
assert_logical_table_columns(
&env.metadata_region(),
target_physical_region_id,
target_logical_region_id2,
&["greptime_timestamp", "greptime_value", "host"],
)
.await;
let logical_regions = env
.metadata_region()
.logical_regions(target_physical_region_id)
.await
.unwrap();
assert_eq!(logical_regions.len(), 2);
assert!(logical_regions.contains(&target_logical_region_id1));
assert!(logical_regions.contains(&target_logical_region_id2));
// Should be ok to sync region from again.
let r = metric_engine
.sync_region(
target_physical_region_id,
SyncRegionFromRequest::FromRegion {
source_region_id: source_physical_region_id,
parallelism: 1,
},
)
.await
.unwrap();
let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap();
assert!(new_opened_logical_region_ids.is_empty());
// Try to close region and reopen it, should be ok.
metric_engine
.handle_request(
target_physical_region_id,
RegionRequest::Close(RegionCloseRequest {}),
)
.await
.unwrap();
let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect();
metric_engine
.handle_request(
target_physical_region_id,
RegionRequest::Open(RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
table_dir: "/test_dir1".to_string(),
path_type: PathType::Bare,
options: physical_region_option,
skip_wal_replay: false,
checkpoint: None,
}),
)
.await
.unwrap();
let logical_regions = env
.metadata_region()
.logical_regions(target_physical_region_id)
.await
.unwrap();
assert_eq!(logical_regions.len(), 2);
assert!(logical_regions.contains(&target_logical_region_id1));
assert!(logical_regions.contains(&target_logical_region_id2));
}
#[tokio::test]
async fn test_sync_region_from_region_with_no_files() {
common_telemetry::init_default_ut_logging();
let env = TestEnv::new().await;
let metric_engine = env.metric();
let source_physical_region_id = RegionId::new(1024, 0);
env.create_physical_region(source_physical_region_id, "/test_dir1", vec![])
.await;
let target_physical_region_id = RegionId::new(1024, 1);
env.create_physical_region(target_physical_region_id, "/test_dir1", vec![])
.await;
let r = metric_engine
.sync_region(
target_physical_region_id,
SyncRegionFromRequest::FromRegion {
source_region_id: source_physical_region_id,
parallelism: 1,
},
)
.await
.unwrap();
let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap();
assert!(new_opened_logical_region_ids.is_empty());
}
#[tokio::test]
async fn test_sync_region_from_region_source_not_exist() {
common_telemetry::init_default_ut_logging();
let env = TestEnv::new().await;
let metric_engine = env.metric();
let source_physical_region_id = RegionId::new(1024, 0);
let target_physical_region_id = RegionId::new(1024, 1);
env.create_physical_region(target_physical_region_id, "/test_dir1", vec![])
.await;
let err = metric_engine
.sync_region(
target_physical_region_id,
SyncRegionFromRequest::FromRegion {
source_region_id: source_physical_region_id,
parallelism: 1,
},
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}
}

View File

@@ -21,7 +21,7 @@ use common_macro::stack_trace_debug;
use datatypes::prelude::ConcreteDataType;
use snafu::{Location, Snafu};
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use store_api::storage::{FileId, RegionId};
#[derive(Snafu)]
#[snafu(visibility(pub))]
@@ -128,6 +128,27 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Mito copy region from operation fails, source region id: {}, target region id: {}",
source_region_id,
target_region_id
))]
MitoCopyRegionFromOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
source_region_id: RegionId,
target_region_id: RegionId,
},
#[snafu(display("Mito edit region operation fails, region id: {}", region_id))]
MitoEditRegion {
region_id: RegionId,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to encode primary key"))]
EncodePrimaryKey {
source: mito_codec::error::Error,
@@ -256,6 +277,21 @@ pub enum Error {
location: Location,
},
#[snafu(display("Unsupported sync region from request for region {}", region_id))]
UnsupportedSyncRegionFromRequest {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Missing file metas in region {}, file ids: {:?}", region_id, file_ids))]
MissingFiles {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
file_ids: Vec<FileId>,
},
#[snafu(display("Unsupported alter kind: {}", kind))]
UnsupportedAlterKind {
kind: String,
@@ -339,11 +375,12 @@ impl ErrorExt for Error {
| ParseRegionOptions { .. }
| UnexpectedRequest { .. }
| UnsupportedAlterKind { .. }
| UnsupportedRemapManifestsRequest { .. } => StatusCode::InvalidArguments,
| UnsupportedRemapManifestsRequest { .. }
| UnsupportedSyncRegionFromRequest { .. } => StatusCode::InvalidArguments,
ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => {
StatusCode::Unsupported
}
ForbiddenPhysicalAlter { .. }
| UnsupportedRegionRequest { .. }
| MissingFiles { .. } => StatusCode::Unsupported,
DeserializeColumnMetadata { .. }
| SerializeColumnMetadata { .. }
@@ -369,7 +406,9 @@ impl ErrorExt for Error {
| MitoSyncOperation { source, .. }
| MitoEnterStagingOperation { source, .. }
| BatchOpenMitoRegion { source, .. }
| BatchCatchupMitoRegion { source, .. } => source.status_code(),
| BatchCatchupMitoRegion { source, .. }
| MitoCopyRegionFromOperation { source, .. }
| MitoEditRegion { source, .. } => source.status_code(),
EncodePrimaryKey { source, .. } => source.status_code(),

View File

@@ -25,6 +25,7 @@ use base64::Engine;
use base64::engine::general_purpose::STANDARD_NO_PAD;
use common_base::readable_size::ReadableSize;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::{debug, info, warn};
use datafusion::prelude::{col, lit};
use futures_util::TryStreamExt;
use futures_util::stream::BoxStream;
@@ -400,14 +401,11 @@ impl MetadataRegion {
.await
.context(CacheGetSnafu)?;
let range = region_metadata.key_values.range(prefix.to_string()..);
let mut result = HashMap::new();
for (k, v) in range {
if !k.starts_with(prefix) {
break;
}
result.insert(k.clone(), v.clone());
}
get_all_with_prefix(&region_metadata, prefix, |k, v| {
result.insert(k.to_string(), v.to_string());
Ok(())
})?;
Ok(result)
}
@@ -558,6 +556,109 @@ impl MetadataRegion {
Ok(())
}
/// Updates logical region metadata so that any entries previously referencing
/// `source_region_id` are modified to reference the data region of `physical_region_id`.
///
/// This method should be called after copying files from `source_region_id`
/// into the target region. It scans the metadata for the target physical
/// region, finds logical regions with the same region number as the source,
/// and reinserts region and column entries updated to use the target's
/// region number.
pub async fn transform_logical_region_metadata(
&self,
physical_region_id: RegionId,
source_region_id: RegionId,
) -> Result<()> {
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let data_region_id = utils::to_data_region_id(physical_region_id);
let logical_regions = self
.logical_regions(data_region_id)
.await?
.into_iter()
.filter(|r| r.region_number() == source_region_id.region_number())
.collect::<Vec<_>>();
if logical_regions.is_empty() {
info!(
"No logical regions found from source region {}, physical region id: {}",
source_region_id, physical_region_id,
);
return Ok(());
}
let metadata = self.load_all(metadata_region_id).await?;
let mut output = Vec::new();
for logical_region_id in &logical_regions {
let prefix = MetadataRegion::concat_column_key_prefix(*logical_region_id);
get_all_with_prefix(&metadata, &prefix, |k, v| {
// Safety: we have checked the prefix
let (src_logical_region_id, column_name) = Self::parse_column_key(k)?.unwrap();
// Change the region number to the data region number.
let new_key = MetadataRegion::concat_column_key(
RegionId::new(
src_logical_region_id.table_id(),
data_region_id.region_number(),
),
&column_name,
);
output.push((new_key, v.to_string()));
Ok(())
})?;
let new_key = MetadataRegion::concat_region_key(RegionId::new(
logical_region_id.table_id(),
data_region_id.region_number(),
));
output.push((new_key, String::new()));
}
if output.is_empty() {
warn!(
"No logical regions metadata found from source region {}, physical region id: {}",
source_region_id, physical_region_id
);
return Ok(());
}
debug!(
"Transform logical regions metadata to physical region {}, source region: {}, transformed metadata: {}",
data_region_id,
source_region_id,
output.len(),
);
let put_request = MetadataRegion::build_put_request_from_iter(output.into_iter());
self.mito
.handle_request(
metadata_region_id,
store_api::region_request::RegionRequest::Put(put_request),
)
.await
.context(MitoWriteOperationSnafu)?;
info!(
"Transformed {} logical regions metadata to physical region {}, source region: {}",
logical_regions.len(),
data_region_id,
source_region_id
);
self.cache.invalidate(&metadata_region_id).await;
Ok(())
}
}
fn get_all_with_prefix(
region_metadata: &RegionMetadataCacheEntry,
prefix: &str,
mut callback: impl FnMut(&str, &str) -> Result<()>,
) -> Result<()> {
let range = region_metadata.key_values.range(prefix.to_string()..);
for (k, v) in range {
if !k.starts_with(prefix) {
break;
}
callback(k, v)?;
}
Ok(())
}
#[cfg(test)]

View File

@@ -89,6 +89,7 @@ use api::region::RegionResponse;
use async_trait::async_trait;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::error::UnexpectedSnafu;
use common_meta::key::SchemaMetadataManagerRef;
use common_recordbatch::{MemoryPermit, QueryMemoryTracker, SendableRecordBatchStream};
use common_stat::get_total_memory_bytes;
@@ -107,10 +108,10 @@ use store_api::metric_engine_consts::{
MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
};
use store_api::region_engine::{
BatchResponses, CopyRegionFromRequest, CopyRegionFromResponse, MitoCopyRegionFromResponse,
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
RemapManifestsRequest, RemapManifestsResponse, SetRegionRoleStateResponse,
SettableRegionRoleState, SyncManifestResponse,
BatchResponses, MitoCopyRegionFromRequest, MitoCopyRegionFromResponse, RegionEngine,
RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
SyncRegionFromRequest, SyncRegionFromResponse,
};
use store_api::region_request::{
AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest,
@@ -124,8 +125,8 @@ use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::config::MitoConfig;
use crate::engine::puffin_index::{IndexEntryContext, collect_index_entries_from_puffin};
use crate::error::{
self, InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu,
Result, SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu,
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
SerdeJsonSnafu, SerializeColumnMetadataSnafu, SerializeManifestSnafu,
};
#[cfg(feature = "enterprise")]
use crate::extension::BoxedExtensionRangeProviderFactory;
@@ -397,7 +398,7 @@ impl MitoEngine {
}
/// Edit region's metadata by [RegionEdit] directly. Use with care.
/// Now we only allow adding files to region (the [RegionEdit] struct can only contain a non-empty "files_to_add" field).
/// Now we only allow adding files or removing files from region (the [RegionEdit] struct can only contain a non-empty "files_to_add" or "files_to_remove" field).
/// Other region editing intention will result in an "invalid request" error.
/// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
@@ -432,7 +433,7 @@ impl MitoEngine {
pub async fn copy_region_from(
&self,
region_id: RegionId,
request: CopyRegionFromRequest,
request: MitoCopyRegionFromRequest,
) -> Result<MitoCopyRegionFromResponse> {
self.inner.copy_region_from(region_id, request).await
}
@@ -641,8 +642,7 @@ impl MitoEngine {
///
/// Only adding or removing files to region is considered valid now.
fn is_valid_region_edit(edit: &RegionEdit) -> bool {
!edit.files_to_add.is_empty()
&& edit.files_to_remove.is_empty()
(!edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty())
&& matches!(
edit,
RegionEdit {
@@ -1075,7 +1075,7 @@ impl EngineInner {
async fn copy_region_from(
&self,
region_id: RegionId,
request: CopyRegionFromRequest,
request: MitoCopyRegionFromRequest,
) -> Result<MitoCopyRegionFromResponse> {
let (request, receiver) =
WorkerRequest::try_from_copy_region_from_request(region_id, request)?;
@@ -1249,15 +1249,21 @@ impl RegionEngine for MitoEngine {
async fn sync_region(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<SyncManifestResponse, BoxedError> {
request: SyncRegionFromRequest,
) -> Result<SyncRegionFromResponse, BoxedError> {
let manifest_info = request
.into_region_manifest_info()
.context(UnexpectedSnafu {
err_msg: "Expected a manifest info request",
})
.map_err(BoxedError::new)?;
let (_, synced) = self
.inner
.sync_region(region_id, manifest_info)
.await
.map_err(BoxedError::new)?;
Ok(SyncManifestResponse::Mito { synced })
Ok(SyncRegionFromResponse::Mito { synced })
}
async fn remap_manifests(
@@ -1270,19 +1276,6 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
Err(BoxedError::new(
error::UnsupportedOperationSnafu {
err_msg: "copy_region_from is not supported",
}
.build(),
))
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.inner.role(region_id)
}
@@ -1421,7 +1414,7 @@ mod tests {
};
assert!(is_valid_region_edit(&edit));
// Invalid: "files_to_add" is empty
// Invalid: "files_to_add" and "files_to_remove" are both empty
let edit = RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
@@ -1433,7 +1426,7 @@ mod tests {
};
assert!(!is_valid_region_edit(&edit));
// Invalid: "files_to_remove" is not empty
// Valid: "files_to_remove" is not empty
let edit = RegionEdit {
files_to_add: vec![FileMeta::default()],
files_to_remove: vec![FileMeta::default()],
@@ -1443,7 +1436,7 @@ mod tests {
flushed_sequence: None,
committed_sequence: None,
};
assert!(!is_valid_region_edit(&edit));
assert!(is_valid_region_edit(&edit));
// Invalid: other fields are not all "None"s
let edit = RegionEdit {

View File

@@ -20,7 +20,7 @@ use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use object_store::layers::mock::{Error as MockError, ErrorKind, MockLayerBuilder};
use store_api::region_engine::{CopyRegionFromRequest, RegionEngine, RegionRole};
use store_api::region_engine::{MitoCopyRegionFromRequest, RegionEngine, RegionRole};
use store_api::region_request::{RegionFlushRequest, RegionRequest};
use store_api::storage::RegionId;
@@ -89,7 +89,7 @@ async fn test_engine_copy_region_from_with_format(flat_format: bool, with_index:
let resp = engine
.copy_region_from(
target_region_id,
CopyRegionFromRequest {
MitoCopyRegionFromRequest {
source_region_id,
parallelism: 1,
},
@@ -126,7 +126,7 @@ async fn test_engine_copy_region_from_with_format(flat_format: bool, with_index:
let resp2 = engine
.copy_region_from(
target_region_id,
CopyRegionFromRequest {
MitoCopyRegionFromRequest {
source_region_id,
parallelism: 1,
},
@@ -207,7 +207,7 @@ async fn test_engine_copy_region_failure_with_format(flat_format: bool) {
let err = engine
.copy_region_from(
target_region_id,
CopyRegionFromRequest {
MitoCopyRegionFromRequest {
source_region_id,
parallelism: 1,
},
@@ -225,7 +225,6 @@ async fn test_engine_copy_region_failure_with_format(flat_format: bool) {
let source_region_dir = format!("{}/data/test/1_0000000001", env.data_home().display());
assert_file_num_in_dir(&source_region_dir, 1);
assert_file_num_in_dir(&format!("{}/index", source_region_dir), 1);
assert_eq!(
source_region_files,
collect_filename_in_dir(&source_region_dir)
@@ -298,7 +297,7 @@ async fn test_engine_copy_region_invalid_args_with_format(flat_format: bool) {
let err = engine
.copy_region_from(
region_id,
CopyRegionFromRequest {
MitoCopyRegionFromRequest {
source_region_id: RegionId::new(2, 1),
parallelism: 1,
},
@@ -309,7 +308,7 @@ async fn test_engine_copy_region_invalid_args_with_format(flat_format: bool) {
let err = engine
.copy_region_from(
region_id,
CopyRegionFromRequest {
MitoCopyRegionFromRequest {
source_region_id: RegionId::new(1, 1),
parallelism: 1,
},
@@ -347,7 +346,7 @@ async fn test_engine_copy_region_unexpected_state_with_format(flat_format: bool)
let err = engine
.copy_region_from(
region_id,
CopyRegionFromRequest {
MitoCopyRegionFromRequest {
source_region_id: RegionId::new(1, 2),
parallelism: 1,
},

View File

@@ -153,7 +153,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) {
// Returns error since the max manifest is 1
let manifest_info = RegionManifestInfo::mito(2, 0, 0);
let err = follower_engine
.sync_region(region_id, manifest_info)
.sync_region(region_id, manifest_info.into())
.await
.unwrap_err();
let err = err.as_any().downcast_ref::<Error>().unwrap();
@@ -161,7 +161,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) {
let manifest_info = RegionManifestInfo::mito(1, 0, 0);
follower_engine
.sync_region(region_id, manifest_info)
.sync_region(region_id, manifest_info.into())
.await
.unwrap();
common_telemetry::info!("Scan the region on the follower engine after sync");
@@ -266,7 +266,7 @@ async fn test_sync_after_alter_region_with_format(flat_format: bool) {
// Sync the region from the leader engine to the follower engine
let manifest_info = RegionManifestInfo::mito(2, 0, 0);
follower_engine
.sync_region(region_id, manifest_info)
.sync_region(region_id, manifest_info.into())
.await
.unwrap();
let expected = "\

View File

@@ -824,13 +824,13 @@ impl WorkerRequest {
Ok((WorkerRequest::RemapManifests(request), receiver))
}
/// Converts [CopyRegionFromRequest] from a [CopyRegionFromRequest](store_api::region_engine::CopyRegionFromRequest).
/// Converts [CopyRegionFromRequest] from a [MitoCopyRegionFromRequest](store_api::region_engine::MitoCopyRegionFromRequest).
pub(crate) fn try_from_copy_region_from_request(
region_id: RegionId,
store_api::region_engine::CopyRegionFromRequest {
store_api::region_engine::MitoCopyRegionFromRequest {
source_region_id,
parallelism,
}: store_api::region_engine::CopyRegionFromRequest,
}: store_api::region_engine::MitoCopyRegionFromRequest,
) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
let (sender, receiver) = oneshot::channel();
let request = CopyRegionFromRequest {

View File

@@ -28,9 +28,9 @@ use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
use store_api::region_engine::{
CopyRegionFromRequest, CopyRegionFromResponse, RegionEngine, RegionManifestInfo, RegionRole,
RegionScannerRef, RegionStatistic, RemapManifestsRequest, RemapManifestsResponse,
SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, RemapManifestsRequest,
RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
SyncRegionFromRequest, SyncRegionFromResponse,
};
use store_api::region_request::RegionRequest;
use store_api::storage::{ConcreteDataType, RegionId, ScanRequest, SequenceNumber};
@@ -113,8 +113,8 @@ impl RegionEngine for MetaRegionEngine {
async fn sync_region(
&self,
_region_id: RegionId,
_manifest_info: RegionManifestInfo,
) -> Result<SyncManifestResponse, BoxedError> {
_request: SyncRegionFromRequest,
) -> Result<SyncRegionFromResponse, BoxedError> {
unimplemented!()
}
@@ -125,14 +125,6 @@ impl RegionEngine for MetaRegionEngine {
unimplemented!()
}
async fn copy_region_from(
&self,
_region_id: RegionId,
_request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError> {
unimplemented!()
}
fn role(&self, _region_id: RegionId) -> Option<RegionRole> {
None
}

View File

@@ -637,9 +637,64 @@ impl RegionStatistic {
}
}
/// The response of syncing the manifest.
/// Request to sync the region from a manifest or a region.
#[derive(Debug, Clone)]
pub enum SyncRegionFromRequest {
/// Syncs the region using manifest information.
/// Used in leader-follower manifest sync scenarios.
FromManifest(RegionManifestInfo),
/// Syncs the region from another region.
///
/// Used by the metric engine to sync logical regions from a source physical region
/// to a target physical region. This copies metadata region SST files and transforms
/// logical region entries to use the target's region number.
FromRegion {
/// The [`RegionId`] of the source region.
source_region_id: RegionId,
/// The parallelism of the sync operation.
parallelism: usize,
},
}
impl From<RegionManifestInfo> for SyncRegionFromRequest {
fn from(manifest_info: RegionManifestInfo) -> Self {
SyncRegionFromRequest::FromManifest(manifest_info)
}
}
impl SyncRegionFromRequest {
/// Creates a new request from a manifest info.
pub fn from_manifest(manifest_info: RegionManifestInfo) -> Self {
SyncRegionFromRequest::FromManifest(manifest_info)
}
/// Creates a new request from a region.
pub fn from_region(source_region_id: RegionId, parallelism: usize) -> Self {
SyncRegionFromRequest::FromRegion {
source_region_id,
parallelism,
}
}
/// Returns true if the request is from a manifest.
pub fn is_from_manifest(&self) -> bool {
matches!(self, SyncRegionFromRequest::FromManifest { .. })
}
/// Converts the request to a region manifest info.
///
/// Returns None if the request is not from a manifest.
pub fn into_region_manifest_info(self) -> Option<RegionManifestInfo> {
match self {
SyncRegionFromRequest::FromManifest(manifest_info) => Some(manifest_info),
SyncRegionFromRequest::FromRegion { .. } => None,
}
}
}
/// The response of syncing the region.
#[derive(Debug)]
pub enum SyncManifestResponse {
pub enum SyncRegionFromResponse {
NotSupported,
Mito {
/// Indicates if the data region was synced.
@@ -656,35 +711,30 @@ pub enum SyncManifestResponse {
},
}
impl SyncManifestResponse {
impl SyncRegionFromResponse {
/// Returns true if data region is synced.
pub fn is_data_synced(&self) -> bool {
match self {
SyncManifestResponse::NotSupported => false,
SyncManifestResponse::Mito { synced } => *synced,
SyncManifestResponse::Metric { data_synced, .. } => *data_synced,
SyncRegionFromResponse::NotSupported => false,
SyncRegionFromResponse::Mito { synced } => *synced,
SyncRegionFromResponse::Metric { data_synced, .. } => *data_synced,
}
}
/// Returns true if the engine is supported the sync operation.
pub fn is_supported(&self) -> bool {
matches!(self, SyncManifestResponse::NotSupported)
}
/// Returns true if the engine is a mito2 engine.
pub fn is_mito(&self) -> bool {
matches!(self, SyncManifestResponse::Mito { .. })
matches!(self, SyncRegionFromResponse::Mito { .. })
}
/// Returns true if the engine is a metric engine.
pub fn is_metric(&self) -> bool {
matches!(self, SyncManifestResponse::Metric { .. })
matches!(self, SyncRegionFromResponse::Metric { .. })
}
/// Returns the new opened logical region ids.
pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
match self {
SyncManifestResponse::Metric {
SyncRegionFromResponse::Metric {
new_opened_logical_region_ids,
..
} => Some(new_opened_logical_region_ids),
@@ -715,7 +765,7 @@ pub struct RemapManifestsResponse {
/// Request to copy files from a source region to a target region.
#[derive(Debug, Clone)]
pub struct CopyRegionFromRequest {
pub struct MitoCopyRegionFromRequest {
/// The [`RegionId`] of the source region.
pub source_region_id: RegionId,
/// The parallelism of the copy operation.
@@ -728,37 +778,6 @@ pub struct MitoCopyRegionFromResponse {
pub copied_file_ids: Vec<FileId>,
}
#[derive(Debug, Clone)]
pub struct MetricCopyRegionFromResponse {
/// The logical regions that were newly opened after the copy operation.
pub new_opened_logical_region_ids: Vec<RegionId>,
}
/// Response to copy region from a source region to a target region.
#[derive(Debug, Clone)]
pub enum CopyRegionFromResponse {
Mito(MitoCopyRegionFromResponse),
Metric(MetricCopyRegionFromResponse),
}
impl CopyRegionFromResponse {
/// Converts the response to a mito2 response.
pub fn into_mito(self) -> Option<MitoCopyRegionFromResponse> {
match self {
CopyRegionFromResponse::Mito(response) => Some(response),
CopyRegionFromResponse::Metric(_) => None,
}
}
/// Converts the response to a metric response.
pub fn into_metric(self) -> Option<MetricCopyRegionFromResponse> {
match self {
CopyRegionFromResponse::Metric(response) => Some(response),
CopyRegionFromResponse::Mito(_) => None,
}
}
}
#[async_trait]
pub trait RegionEngine: Send + Sync {
/// Name of this engine
@@ -880,8 +899,8 @@ pub trait RegionEngine: Send + Sync {
async fn sync_region(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<SyncManifestResponse, BoxedError>;
request: SyncRegionFromRequest,
) -> Result<SyncRegionFromResponse, BoxedError>;
/// Remaps manifests from old regions to new regions.
async fn remap_manifests(
@@ -889,13 +908,6 @@ pub trait RegionEngine: Send + Sync {
request: RemapManifestsRequest,
) -> Result<RemapManifestsResponse, BoxedError>;
/// Copies region from a source region to a target region.
async fn copy_region_from(
&self,
region_id: RegionId,
request: CopyRegionFromRequest,
) -> Result<CopyRegionFromResponse, BoxedError>;
/// Sets region role state gracefully.
///
/// After the call returns, the engine ensures no more write operations will succeed in the region.