fix: ensure logical regions are synced during region sync (#5878)

* fix: ensure logical regions are synced during region sync

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-04-14 20:37:31 +08:00
committed by GitHub
parent 7ddd7a9888
commit c522893552
19 changed files with 470 additions and 76 deletions

View File

@@ -308,20 +308,36 @@ impl RegionServer {
.with_context(|_| HandleRegionRequestSnafu { region_id })
}
/// Sync region manifest and registers new opened logical regions.
pub async fn sync_region_manifest(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<()> {
let engine = self
let engine_with_status = self
.inner
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?;
engine
let Some(new_opened_regions) = engine_with_status
.sync_region(region_id, manifest_info)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })
.with_context(|_| HandleRegionRequestSnafu { region_id })?
.new_opened_logical_region_ids()
else {
return Ok(());
};
for region in new_opened_regions {
self.inner.region_map.insert(
region,
RegionEngineWithStatus::Ready(engine_with_status.engine().clone()),
);
info!("Logical region {} is registered!", region);
}
Ok(())
}
/// Set region role state gracefully.
@@ -526,6 +542,15 @@ impl RegionEngineWithStatus {
RegionEngineWithStatus::Ready(engine) => engine,
}
}
/// Returns [RegionEngineRef] reference.
pub fn engine(&self) -> &RegionEngineRef {
match self {
RegionEngineWithStatus::Registering(engine) => engine,
RegionEngineWithStatus::Deregistering(engine) => engine,
RegionEngineWithStatus::Ready(engine) => engine,
}
}
}
impl Deref for RegionEngineWithStatus {
@@ -1029,7 +1054,7 @@ impl RegionServerInner {
for region in logical_regions {
self.region_map
.insert(region, RegionEngineWithStatus::Ready(engine.clone()));
debug!("Logical region {} is registered!", region);
info!("Logical region {} is registered!", region);
}
Ok(())
}

View File

@@ -33,7 +33,7 @@ use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SettableRegionRoleState,
SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
@@ -250,7 +250,7 @@ impl RegionEngine for MockRegionEngine {
&self,
_region_id: RegionId,
_manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
) -> Result<SyncManifestResponse, BoxedError> {
unimplemented!()
}

View File

@@ -28,7 +28,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
SinglePartitionScanner,
SinglePartitionScanner, SyncManifestResponse,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
@@ -145,9 +145,9 @@ impl RegionEngine for FileRegionEngine {
&self,
_region_id: RegionId,
_manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
) -> Result<SyncManifestResponse, BoxedError> {
// File engine doesn't need to sync region manifest.
Ok(())
Ok(SyncManifestResponse::NotSupported)
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {

View File

@@ -24,6 +24,7 @@ mod put;
mod read;
mod region_metadata;
mod state;
mod sync;
use std::any::Any;
use std::collections::HashMap;
@@ -41,6 +42,7 @@ use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState,
SyncManifestResponse,
};
use store_api::region_request::{BatchRegionDdlRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
@@ -48,7 +50,7 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
use self::state::MetricEngineState;
use crate::config::EngineConfig;
use crate::data_region::DataRegion;
use crate::error::{self, MetricManifestInfoSnafu, Result, UnsupportedRegionRequestSnafu};
use crate::error::{self, Result, UnsupportedRegionRequestSnafu};
use crate::metadata_region::MetadataRegion;
use crate::row_modifier::RowModifier;
use crate::utils;
@@ -311,40 +313,11 @@ impl RegionEngine for MetricEngine {
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
if !manifest_info.is_metric() {
return Err(BoxedError::new(
MetricManifestInfoSnafu { region_id }.build(),
));
}
let metadata_region_id = utils::to_metadata_region_id(region_id);
// checked by ensure above
let metadata_manifest_version = manifest_info
.metadata_manifest_version()
.unwrap_or_default();
let metadata_flushed_entry_id = manifest_info
.metadata_flushed_entry_id()
.unwrap_or_default();
let metadata_region_manifest =
RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id);
) -> Result<SyncManifestResponse, BoxedError> {
self.inner
.mito
.sync_region(metadata_region_id, metadata_region_manifest)
.await?;
let data_region_id = utils::to_data_region_id(region_id);
let data_manifest_version = manifest_info.data_manifest_version();
let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
let data_region_manifest =
RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id);
self.inner
.mito
.sync_region(data_region_id, data_region_manifest)
.await?;
Ok(())
.sync_region(region_id, manifest_info)
.await
.map_err(BoxedError::new)
}
async fn set_region_role_state_gracefully(
@@ -423,6 +396,7 @@ impl MetricEngine {
self.inner.mito.clone()
}
/// Returns all logical regions associated with the physical region.
pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
self.inner
.metadata_region

View File

@@ -145,7 +145,7 @@ impl MetricEngineInner {
let _write_guard = self
.metadata_region
.write_lock_logical_region(*region_id)
.await;
.await?;
write_guards.insert(*region_id, _write_guard);
}

View File

@@ -279,9 +279,16 @@ impl MetricEngineInner {
.add_logical_regions(physical_region_id, true, logical_region_columns)
.await?;
let mut state = self.state.write().unwrap();
state.add_physical_columns(data_region_id, new_add_columns);
state.add_logical_regions(physical_region_id, logical_regions);
{
let mut state = self.state.write().unwrap();
state.add_physical_columns(data_region_id, new_add_columns);
state.add_logical_regions(physical_region_id, logical_regions.clone());
}
for logical_region_id in logical_regions {
self.metadata_region
.open_logical_region(logical_region_id)
.await;
}
Ok(())
}

View File

@@ -132,12 +132,14 @@ impl MetricEngineInner {
/// Includes:
/// - Record physical region's column names
/// - Record the mapping between logical region id and physical region id
///
/// Returns new opened logical region ids.
pub(crate) async fn recover_states(
&self,
physical_region_id: RegionId,
primary_key_encoding: PrimaryKeyEncoding,
physical_region_options: PhysicalRegionOptions,
) -> Result<()> {
) -> Result<Vec<RegionId>> {
// load logical regions and physical column names
let logical_regions = self
.metadata_region
@@ -147,7 +149,6 @@ impl MetricEngineInner {
.data_region
.physical_columns(physical_region_id)
.await?;
let logical_region_num = logical_regions.len();
{
let mut state = self.state.write().unwrap();
@@ -168,15 +169,22 @@ impl MetricEngineInner {
}
}
let mut opened_logical_region_ids = Vec::new();
// The `recover_states` may be called multiple times, we only count the logical regions
// that are opened for the first time.
for logical_region_id in logical_regions {
self.metadata_region
if self
.metadata_region
.open_logical_region(logical_region_id)
.await;
.await
{
opened_logical_region_ids.push(logical_region_id);
}
}
LOGICAL_REGION_COUNT.add(logical_region_num as i64);
LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
Ok(())
Ok(opened_logical_region_ids)
}
}

View File

@@ -46,7 +46,7 @@ impl MetricEngineInner {
let _read_guard = self
.metadata_region
.read_lock_logical_region(logical_region_id)
.await;
.await?;
// Load logical and physical columns, and intersect them to get logical column metadata.
let logical_column_metadata = self
.metadata_region

View File

@@ -0,0 +1,261 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Instant;
use common_telemetry::info;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncManifestResponse};
use store_api::storage::RegionId;
use crate::engine::MetricEngineInner;
use crate::error::{
MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result,
};
use crate::utils;
impl MetricEngineInner {
pub async fn sync_region(
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<SyncManifestResponse> {
ensure!(
manifest_info.is_metric(),
MetricManifestInfoSnafu { region_id }
);
let metadata_region_id = utils::to_metadata_region_id(region_id);
// checked by ensure above
let metadata_manifest_version = manifest_info
.metadata_manifest_version()
.unwrap_or_default();
let metadata_flushed_entry_id = manifest_info
.metadata_flushed_entry_id()
.unwrap_or_default();
let metadata_region_manifest =
RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id);
let metadata_synced = self
.mito
.sync_region(metadata_region_id, metadata_region_manifest)
.await
.context(MitoSyncOperationSnafu)?
.is_data_synced();
let data_region_id = utils::to_data_region_id(region_id);
let data_manifest_version = manifest_info.data_manifest_version();
let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
let data_region_manifest =
RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id);
let data_synced = self
.mito
.sync_region(data_region_id, data_region_manifest)
.await
.context(MitoSyncOperationSnafu)?
.is_data_synced();
if !metadata_synced {
return Ok(SyncManifestResponse::Metric {
metadata_synced,
data_synced,
new_opened_logical_region_ids: vec![],
});
}
let now = Instant::now();
// Recovers the states from the metadata region
// if the metadata manifest version is updated.
let physical_region_options = *self
.state
.read()
.unwrap()
.physical_region_states()
.get(&data_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?
.options();
let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
},
)?;
let new_opened_logical_region_ids = self
.recover_states(
data_region_id,
primary_key_encoding,
physical_region_options,
)
.await?;
info!(
"Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}",
data_region_id,
now.elapsed(),
new_opened_logical_region_ids
);
Ok(SyncManifestResponse::Metric {
metadata_synced,
data_synced,
new_opened_logical_region_ids,
})
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use api::v1::SemanticType;
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::{RegionEngine, RegionManifestInfo};
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest,
};
use store_api::storage::RegionId;
use crate::metadata_region::MetadataRegion;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_sync_region_with_new_created_logical_regions() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await;
env.init_metric_region().await;
info!("creating follower engine");
// Create a follower engine.
let (_follower_mito, follower_metric) = env.create_follower_engine().await;
let physical_region_id = env.default_physical_region_id();
// Flushes the physical region
let metric_engine = env.metric();
metric_engine
.handle_request(
env.default_physical_region_id(),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
let response = follower_metric
.sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
.await
.unwrap();
assert!(response.is_metric());
let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]);
// Sync again, no new logical region should be opened
let response = follower_metric
.sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
.await
.unwrap();
assert!(response.is_metric());
let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
assert!(new_opened_logical_region_ids.is_empty());
}
fn test_alter_logical_region_request() -> RegionAlterRequest {
RegionAlterRequest {
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag1",
ConcreteDataType::string_datatype(),
false,
),
},
location: None,
}],
},
}
}
#[tokio::test]
async fn test_sync_region_alter_alter_logical_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await;
env.init_metric_region().await;
info!("creating follower engine");
let physical_region_id = env.default_physical_region_id();
// Flushes the physical region
let metric_engine = env.metric();
metric_engine
.handle_request(
env.default_physical_region_id(),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
// Create a follower engine.
let (follower_mito, follower_metric) = env.create_follower_engine().await;
let metric_engine = env.metric();
let engine_inner = env.metric().inner;
let region_id = env.default_logical_region_id();
let request = test_alter_logical_region_request();
engine_inner
.alter_logical_regions(
physical_region_id,
vec![(region_id, request)],
&mut HashMap::new(),
)
.await
.unwrap();
// Flushes the physical region
metric_engine
.handle_request(
env.default_physical_region_id(),
RegionRequest::Flush(RegionFlushRequest::default()),
)
.await
.unwrap();
// Sync the follower engine
let response = follower_metric
.sync_region(physical_region_id, RegionManifestInfo::metric(2, 0, 2, 0))
.await
.unwrap();
assert!(response.is_metric());
let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
assert!(new_opened_logical_region_ids.is_empty());
let logical_region_id = env.default_logical_region_id();
let metadata_region = MetadataRegion::new(follower_mito.clone());
let semantic_type = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "tag1")
.await
.unwrap()
.unwrap();
assert_eq!(semantic_type, SemanticType::Tag);
let timestamp_index = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
.await
.unwrap()
.unwrap();
assert_eq!(timestamp_index, SemanticType::Timestamp);
}
}

View File

@@ -118,6 +118,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Mito delete operation fails"))]
MitoDeleteOperation {
source: BoxedError,
@@ -132,6 +133,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Mito sync operation fails"))]
MitoSyncOperation {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to collect record batch stream"))]
CollectRecordBatchStream {
source: common_recordbatch::error::Error,
@@ -311,7 +319,8 @@ impl ErrorExt for Error {
| MitoWriteOperation { source, .. }
| MitoCatchupOperation { source, .. }
| MitoFlushOperation { source, .. }
| MitoDeleteOperation { source, .. } => source.status_code(),
| MitoDeleteOperation { source, .. }
| MitoSyncOperation { source, .. } => source.status_code(),
EncodePrimaryKey { source, .. } => source.status_code(),

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
@@ -76,11 +77,22 @@ impl MetadataRegion {
}
}
pub async fn open_logical_region(&self, logical_region_id: RegionId) {
self.logical_region_lock
/// Open a logical region.
///
/// Returns true if the logical region is opened for the first time.
pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
match self
.logical_region_lock
.write()
.await
.insert(logical_region_id, Arc::new(RwLock::new(())));
.entry(logical_region_id)
{
Entry::Occupied(_) => false,
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(Arc::new(RwLock::new(())));
true
}
}
}
/// Retrieve a read lock guard of given logical region id.
@@ -178,6 +190,7 @@ impl MetadataRegion {
Ok(columns)
}
/// Return all logical regions associated with the physical region.
pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);

View File

@@ -16,6 +16,7 @@
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, Row, SemanticType, Value};
use common_telemetry::debug;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito2::config::MitoConfig;
@@ -28,7 +29,7 @@ use store_api::metric_engine_consts::{
};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest,
AddColumn, AlterKind, RegionAlterRequest, RegionCreateRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::{ColumnId, RegionId};
@@ -77,6 +78,34 @@ impl TestEnv {
self.metric.clone()
}
/// Creates a new follower engine with the same config as the leader engine.
pub async fn create_follower_engine(&mut self) -> (MitoEngine, MetricEngine) {
let mito = self
.mito_env
.create_follower_engine(MitoConfig::default())
.await;
let metric = MetricEngine::new(mito.clone(), EngineConfig::default());
let region_id = self.default_physical_region_id();
debug!("opening default physical region: {region_id}");
let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect();
metric
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: self.default_region_dir(),
options: physical_region_option,
skip_wal_replay: true,
}),
)
.await
.unwrap();
(mito, metric)
}
/// Create regions in [MetricEngine] under [`default_region_id`]
/// and region dir `"test_metric_region"`.
///

View File

@@ -82,7 +82,7 @@ use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState,
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
@@ -496,7 +496,7 @@ impl EngineInner {
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<ManifestVersion> {
) -> Result<(ManifestVersion, bool)> {
ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu);
let manifest_version = manifest_info.data_manifest_version();
let (request, receiver) =
@@ -631,12 +631,14 @@ impl RegionEngine for MitoEngine {
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
self.inner
) -> Result<SyncManifestResponse, BoxedError> {
let (_, synced) = self
.inner
.sync_region(region_id, manifest_info)
.await
.map_err(BoxedError::new)
.map(|_| ())
.map_err(BoxedError::new)?;
Ok(SyncManifestResponse::Mito { synced })
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {

View File

@@ -313,11 +313,12 @@ impl RegionManifestManager {
}
);
let region_id = self.manifest.metadata.region_id;
// Fetches manifests from the last version strictly.
let mut manifests = self
.store
// Invariant: last_version < target_version.
.fetch_manifests_strict_from(last_version + 1, target_version + 1)
.fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
.await?;
// Case 2: No manifests in range: [current_version+1, target_version+1)
@@ -327,7 +328,7 @@ impl RegionManifestManager {
// [Current Version]......[Target Version]
// [Follower region]
if manifests.is_empty() {
debug!(
info!(
"Manifests are not strict from {}, region: {}, tries to install the last checkpoint",
last_version, self.manifest.metadata.region_id
);
@@ -341,7 +342,7 @@ impl RegionManifestManager {
manifests = self
.store
// Invariant: last_version < target_version.
.fetch_manifests_strict_from(last_version + 1, target_version + 1)
.fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id)
.await?;
}

View File

@@ -29,6 +29,7 @@ use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::manifest::ManifestVersion;
use store_api::storage::RegionId;
use tokio::sync::Semaphore;
use crate::error::{
@@ -243,12 +244,17 @@ impl ManifestObjectStore {
&self,
start_version: ManifestVersion,
end_version: ManifestVersion,
region_id: RegionId,
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let mut manifests = self.fetch_manifests(start_version, end_version).await?;
let start_index = manifests.iter().position(|(v, _)| *v == start_version);
debug!(
"fetches manifests in range [{},{}), start_index: {:?}",
start_version, end_version, start_index
"Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
start_version,
end_version,
start_index,
region_id,
manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
);
if let Some(start_index) = start_index {
Ok(manifests.split_off(start_index))

View File

@@ -692,7 +692,7 @@ impl WorkerRequest {
pub(crate) fn new_sync_region_request(
region_id: RegionId,
manifest_version: ManifestVersion,
) -> (WorkerRequest, Receiver<Result<ManifestVersion>>) {
) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
let (sender, receiver) = oneshot::channel();
(
WorkerRequest::SyncRegion(RegionSyncRequest {
@@ -892,7 +892,8 @@ pub(crate) struct RegionEditResult {
pub(crate) struct RegionSyncRequest {
pub(crate) region_id: RegionId,
pub(crate) manifest_version: ManifestVersion,
pub(crate) sender: Sender<Result<ManifestVersion>>,
/// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
}
#[cfg(test)]

View File

@@ -136,6 +136,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
};
let original_manifest_version = region.manifest_ctx.manifest_version().await;
let manifest = match region
.manifest_ctx
.install_manifest_to(request.manifest_version)
@@ -173,7 +174,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.build();
region.version_control.overwrite_current(Arc::new(version));
let _ = sender.send(Ok(manifest.manifest_version));
let updated = manifest.manifest_version > original_manifest_version;
let _ = sender.send(Ok((manifest.manifest_version, updated)));
}
}

View File

@@ -29,7 +29,7 @@ use store_api::metadata::{
};
use store_api::region_engine::{
RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic,
SetRegionRoleStateResponse, SettableRegionRoleState,
SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
};
use store_api::region_request::RegionRequest;
use store_api::storage::{ConcreteDataType, RegionId, ScanRequest, SequenceNumber};
@@ -113,7 +113,7 @@ impl RegionEngine for MetaRegionEngine {
&self,
_region_id: RegionId,
_manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError> {
) -> Result<SyncManifestResponse, BoxedError> {
unimplemented!()
}

View File

@@ -583,6 +583,62 @@ impl RegionStatistic {
}
}
/// The response of syncing the manifest.
#[derive(Debug)]
pub enum SyncManifestResponse {
NotSupported,
Mito {
/// Indicates if the data region was synced.
synced: bool,
},
Metric {
/// Indicates if the metadata region was synced.
metadata_synced: bool,
/// Indicates if the data region was synced.
data_synced: bool,
/// The logical regions that were newly opened during the sync operation.
/// This only occurs after the metadata region has been successfully synced.
new_opened_logical_region_ids: Vec<RegionId>,
},
}
impl SyncManifestResponse {
/// Returns true if data region is synced.
pub fn is_data_synced(&self) -> bool {
match self {
SyncManifestResponse::NotSupported => false,
SyncManifestResponse::Mito { synced } => *synced,
SyncManifestResponse::Metric { data_synced, .. } => *data_synced,
}
}
/// Returns true if the engine is supported the sync operation.
pub fn is_supported(&self) -> bool {
matches!(self, SyncManifestResponse::NotSupported)
}
/// Returns true if the engine is a mito2 engine.
pub fn is_mito(&self) -> bool {
matches!(self, SyncManifestResponse::Mito { .. })
}
/// Returns true if the engine is a metric engine.
pub fn is_metric(&self) -> bool {
matches!(self, SyncManifestResponse::Metric { .. })
}
/// Returns the new opened logical region ids.
pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
match self {
SyncManifestResponse::Metric {
new_opened_logical_region_ids,
..
} => Some(new_opened_logical_region_ids),
_ => None,
}
}
}
#[async_trait]
pub trait RegionEngine: Send + Sync {
/// Name of this engine
@@ -689,7 +745,7 @@ pub trait RegionEngine: Send + Sync {
&self,
region_id: RegionId,
manifest_info: RegionManifestInfo,
) -> Result<(), BoxedError>;
) -> Result<SyncManifestResponse, BoxedError>;
/// Sets region role state gracefully.
///