diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index c05291050c..14ccfa2816 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -308,20 +308,36 @@ impl RegionServer { .with_context(|_| HandleRegionRequestSnafu { region_id }) } + /// Sync region manifest and registers new opened logical regions. pub async fn sync_region_manifest( &self, region_id: RegionId, manifest_info: RegionManifestInfo, ) -> Result<()> { - let engine = self + let engine_with_status = self .inner .region_map .get(®ion_id) .with_context(|| RegionNotFoundSnafu { region_id })?; - engine + + let Some(new_opened_regions) = engine_with_status .sync_region(region_id, manifest_info) .await - .with_context(|_| HandleRegionRequestSnafu { region_id }) + .with_context(|_| HandleRegionRequestSnafu { region_id })? + .new_opened_logical_region_ids() + else { + return Ok(()); + }; + + for region in new_opened_regions { + self.inner.region_map.insert( + region, + RegionEngineWithStatus::Ready(engine_with_status.engine().clone()), + ); + info!("Logical region {} is registered!", region); + } + + Ok(()) } /// Set region role state gracefully. @@ -526,6 +542,15 @@ impl RegionEngineWithStatus { RegionEngineWithStatus::Ready(engine) => engine, } } + + /// Returns [RegionEngineRef] reference. + pub fn engine(&self) -> &RegionEngineRef { + match self { + RegionEngineWithStatus::Registering(engine) => engine, + RegionEngineWithStatus::Deregistering(engine) => engine, + RegionEngineWithStatus::Ready(engine) => engine, + } + } } impl Deref for RegionEngineWithStatus { @@ -1029,7 +1054,7 @@ impl RegionServerInner { for region in logical_regions { self.region_map .insert(region, RegionEngineWithStatus::Ready(engine.clone())); - debug!("Logical region {} is registered!", region); + info!("Logical region {} is registered!", region); } Ok(()) } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index b349024cc9..f182e1c423 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -33,7 +33,7 @@ use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, - SetRegionRoleStateResponse, SettableRegionRoleState, + SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, }; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; @@ -250,7 +250,7 @@ impl RegionEngine for MockRegionEngine { &self, _region_id: RegionId, _manifest_info: RegionManifestInfo, - ) -> Result<(), BoxedError> { + ) -> Result { unimplemented!() } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index cf5e5c7576..09a373caad 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -28,7 +28,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState, - SinglePartitionScanner, + SinglePartitionScanner, SyncManifestResponse, }; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, @@ -145,9 +145,9 @@ impl RegionEngine for FileRegionEngine { &self, _region_id: RegionId, _manifest_info: RegionManifestInfo, - ) -> Result<(), BoxedError> { + ) -> Result { // File engine doesn't need to sync region manifest. - Ok(()) + Ok(SyncManifestResponse::NotSupported) } fn role(&self, region_id: RegionId) -> Option { diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 74978fda78..509438b4b2 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -24,6 +24,7 @@ mod put; mod read; mod region_metadata; mod state; +mod sync; use std::any::Any; use std::collections::HashMap; @@ -41,6 +42,7 @@ use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{ RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState, + SyncManifestResponse, }; use store_api::region_request::{BatchRegionDdlRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; @@ -48,7 +50,7 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; use self::state::MetricEngineState; use crate::config::EngineConfig; use crate::data_region::DataRegion; -use crate::error::{self, MetricManifestInfoSnafu, Result, UnsupportedRegionRequestSnafu}; +use crate::error::{self, Result, UnsupportedRegionRequestSnafu}; use crate::metadata_region::MetadataRegion; use crate::row_modifier::RowModifier; use crate::utils; @@ -311,40 +313,11 @@ impl RegionEngine for MetricEngine { &self, region_id: RegionId, manifest_info: RegionManifestInfo, - ) -> Result<(), BoxedError> { - if !manifest_info.is_metric() { - return Err(BoxedError::new( - MetricManifestInfoSnafu { region_id }.build(), - )); - } - - let metadata_region_id = utils::to_metadata_region_id(region_id); - // checked by ensure above - let metadata_manifest_version = manifest_info - .metadata_manifest_version() - .unwrap_or_default(); - let metadata_flushed_entry_id = manifest_info - .metadata_flushed_entry_id() - .unwrap_or_default(); - let metadata_region_manifest = - RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id); + ) -> Result { self.inner - .mito - .sync_region(metadata_region_id, metadata_region_manifest) - .await?; - - let data_region_id = utils::to_data_region_id(region_id); - let data_manifest_version = manifest_info.data_manifest_version(); - let data_flushed_entry_id = manifest_info.data_flushed_entry_id(); - let data_region_manifest = - RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id); - - self.inner - .mito - .sync_region(data_region_id, data_region_manifest) - .await?; - - Ok(()) + .sync_region(region_id, manifest_info) + .await + .map_err(BoxedError::new) } async fn set_region_role_state_gracefully( @@ -423,6 +396,7 @@ impl MetricEngine { self.inner.mito.clone() } + /// Returns all logical regions associated with the physical region. pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result> { self.inner .metadata_region diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 0b23a80bfd..1d82149a7d 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -145,7 +145,7 @@ impl MetricEngineInner { let _write_guard = self .metadata_region .write_lock_logical_region(*region_id) - .await; + .await?; write_guards.insert(*region_id, _write_guard); } diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index bfb7737df7..1ceb20d206 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -279,9 +279,16 @@ impl MetricEngineInner { .add_logical_regions(physical_region_id, true, logical_region_columns) .await?; - let mut state = self.state.write().unwrap(); - state.add_physical_columns(data_region_id, new_add_columns); - state.add_logical_regions(physical_region_id, logical_regions); + { + let mut state = self.state.write().unwrap(); + state.add_physical_columns(data_region_id, new_add_columns); + state.add_logical_regions(physical_region_id, logical_regions.clone()); + } + for logical_region_id in logical_regions { + self.metadata_region + .open_logical_region(logical_region_id) + .await; + } Ok(()) } diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index eb9f266be2..4b25cf38f2 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -132,12 +132,14 @@ impl MetricEngineInner { /// Includes: /// - Record physical region's column names /// - Record the mapping between logical region id and physical region id + /// + /// Returns new opened logical region ids. pub(crate) async fn recover_states( &self, physical_region_id: RegionId, primary_key_encoding: PrimaryKeyEncoding, physical_region_options: PhysicalRegionOptions, - ) -> Result<()> { + ) -> Result> { // load logical regions and physical column names let logical_regions = self .metadata_region @@ -147,7 +149,6 @@ impl MetricEngineInner { .data_region .physical_columns(physical_region_id) .await?; - let logical_region_num = logical_regions.len(); { let mut state = self.state.write().unwrap(); @@ -168,15 +169,22 @@ impl MetricEngineInner { } } + let mut opened_logical_region_ids = Vec::new(); + // The `recover_states` may be called multiple times, we only count the logical regions + // that are opened for the first time. for logical_region_id in logical_regions { - self.metadata_region + if self + .metadata_region .open_logical_region(logical_region_id) - .await; + .await + { + opened_logical_region_ids.push(logical_region_id); + } } - LOGICAL_REGION_COUNT.add(logical_region_num as i64); + LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64); - Ok(()) + Ok(opened_logical_region_ids) } } diff --git a/src/metric-engine/src/engine/region_metadata.rs b/src/metric-engine/src/engine/region_metadata.rs index 9f00235e96..f8e0dd8dc3 100644 --- a/src/metric-engine/src/engine/region_metadata.rs +++ b/src/metric-engine/src/engine/region_metadata.rs @@ -46,7 +46,7 @@ impl MetricEngineInner { let _read_guard = self .metadata_region .read_lock_logical_region(logical_region_id) - .await; + .await?; // Load logical and physical columns, and intersect them to get logical column metadata. let logical_column_metadata = self .metadata_region diff --git a/src/metric-engine/src/engine/sync.rs b/src/metric-engine/src/engine/sync.rs new file mode 100644 index 0000000000..fe0d8ef6d0 --- /dev/null +++ b/src/metric-engine/src/engine/sync.rs @@ -0,0 +1,261 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Instant; + +use common_telemetry::info; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncManifestResponse}; +use store_api::storage::RegionId; + +use crate::engine::MetricEngineInner; +use crate::error::{ + MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result, +}; +use crate::utils; + +impl MetricEngineInner { + pub async fn sync_region( + &self, + region_id: RegionId, + manifest_info: RegionManifestInfo, + ) -> Result { + ensure!( + manifest_info.is_metric(), + MetricManifestInfoSnafu { region_id } + ); + + let metadata_region_id = utils::to_metadata_region_id(region_id); + // checked by ensure above + let metadata_manifest_version = manifest_info + .metadata_manifest_version() + .unwrap_or_default(); + let metadata_flushed_entry_id = manifest_info + .metadata_flushed_entry_id() + .unwrap_or_default(); + let metadata_region_manifest = + RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id); + let metadata_synced = self + .mito + .sync_region(metadata_region_id, metadata_region_manifest) + .await + .context(MitoSyncOperationSnafu)? + .is_data_synced(); + + let data_region_id = utils::to_data_region_id(region_id); + let data_manifest_version = manifest_info.data_manifest_version(); + let data_flushed_entry_id = manifest_info.data_flushed_entry_id(); + let data_region_manifest = + RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id); + + let data_synced = self + .mito + .sync_region(data_region_id, data_region_manifest) + .await + .context(MitoSyncOperationSnafu)? + .is_data_synced(); + + if !metadata_synced { + return Ok(SyncManifestResponse::Metric { + metadata_synced, + data_synced, + new_opened_logical_region_ids: vec![], + }); + } + + let now = Instant::now(); + // Recovers the states from the metadata region + // if the metadata manifest version is updated. + let physical_region_options = *self + .state + .read() + .unwrap() + .physical_region_states() + .get(&data_region_id) + .context(PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + })? + .options(); + let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context( + PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + }, + )?; + let new_opened_logical_region_ids = self + .recover_states( + data_region_id, + primary_key_encoding, + physical_region_options, + ) + .await?; + info!( + "Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}", + data_region_id, + now.elapsed(), + new_opened_logical_region_ids + ); + + Ok(SyncManifestResponse::Metric { + metadata_synced, + data_synced, + new_opened_logical_region_ids, + }) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use api::v1::SemanticType; + use common_telemetry::info; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use store_api::metadata::ColumnMetadata; + use store_api::region_engine::{RegionEngine, RegionManifestInfo}; + use store_api::region_request::{ + AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest, + }; + use store_api::storage::RegionId; + + use crate::metadata_region::MetadataRegion; + use crate::test_util::TestEnv; + + #[tokio::test] + async fn test_sync_region_with_new_created_logical_regions() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await; + env.init_metric_region().await; + + info!("creating follower engine"); + // Create a follower engine. + let (_follower_mito, follower_metric) = env.create_follower_engine().await; + + let physical_region_id = env.default_physical_region_id(); + + // Flushes the physical region + let metric_engine = env.metric(); + metric_engine + .handle_request( + env.default_physical_region_id(), + RegionRequest::Flush(RegionFlushRequest::default()), + ) + .await + .unwrap(); + + let response = follower_metric + .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0)) + .await + .unwrap(); + assert!(response.is_metric()); + let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap(); + assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]); + + // Sync again, no new logical region should be opened + let response = follower_metric + .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0)) + .await + .unwrap(); + assert!(response.is_metric()); + let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap(); + assert!(new_opened_logical_region_ids.is_empty()); + } + + fn test_alter_logical_region_request() -> RegionAlterRequest { + RegionAlterRequest { + kind: AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + "tag1", + ConcreteDataType::string_datatype(), + false, + ), + }, + location: None, + }], + }, + } + } + + #[tokio::test] + async fn test_sync_region_alter_alter_logical_region() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await; + env.init_metric_region().await; + + info!("creating follower engine"); + let physical_region_id = env.default_physical_region_id(); + // Flushes the physical region + let metric_engine = env.metric(); + metric_engine + .handle_request( + env.default_physical_region_id(), + RegionRequest::Flush(RegionFlushRequest::default()), + ) + .await + .unwrap(); + + // Create a follower engine. + let (follower_mito, follower_metric) = env.create_follower_engine().await; + let metric_engine = env.metric(); + let engine_inner = env.metric().inner; + let region_id = env.default_logical_region_id(); + let request = test_alter_logical_region_request(); + + engine_inner + .alter_logical_regions( + physical_region_id, + vec![(region_id, request)], + &mut HashMap::new(), + ) + .await + .unwrap(); + + // Flushes the physical region + metric_engine + .handle_request( + env.default_physical_region_id(), + RegionRequest::Flush(RegionFlushRequest::default()), + ) + .await + .unwrap(); + + // Sync the follower engine + let response = follower_metric + .sync_region(physical_region_id, RegionManifestInfo::metric(2, 0, 2, 0)) + .await + .unwrap(); + assert!(response.is_metric()); + let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap(); + assert!(new_opened_logical_region_ids.is_empty()); + + let logical_region_id = env.default_logical_region_id(); + let metadata_region = MetadataRegion::new(follower_mito.clone()); + let semantic_type = metadata_region + .column_semantic_type(physical_region_id, logical_region_id, "tag1") + .await + .unwrap() + .unwrap(); + assert_eq!(semantic_type, SemanticType::Tag); + let timestamp_index = metadata_region + .column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp") + .await + .unwrap() + .unwrap(); + assert_eq!(timestamp_index, SemanticType::Timestamp); + } +} diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 8be535ec9f..5f853037e1 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -118,6 +118,7 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Mito delete operation fails"))] MitoDeleteOperation { source: BoxedError, @@ -132,6 +133,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Mito sync operation fails"))] + MitoSyncOperation { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to collect record batch stream"))] CollectRecordBatchStream { source: common_recordbatch::error::Error, @@ -311,7 +319,8 @@ impl ErrorExt for Error { | MitoWriteOperation { source, .. } | MitoCatchupOperation { source, .. } | MitoFlushOperation { source, .. } - | MitoDeleteOperation { source, .. } => source.status_code(), + | MitoDeleteOperation { source, .. } + | MitoSyncOperation { source, .. } => source.status_code(), EncodePrimaryKey { source, .. } => source.status_code(), diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 2b066a0bde..7e7bae095f 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; @@ -76,11 +77,22 @@ impl MetadataRegion { } } - pub async fn open_logical_region(&self, logical_region_id: RegionId) { - self.logical_region_lock + /// Open a logical region. + /// + /// Returns true if the logical region is opened for the first time. + pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool { + match self + .logical_region_lock .write() .await - .insert(logical_region_id, Arc::new(RwLock::new(()))); + .entry(logical_region_id) + { + Entry::Occupied(_) => false, + Entry::Vacant(vacant_entry) => { + vacant_entry.insert(Arc::new(RwLock::new(()))); + true + } + } } /// Retrieve a read lock guard of given logical region id. @@ -178,6 +190,7 @@ impl MetadataRegion { Ok(columns) } + /// Return all logical regions associated with the physical region. pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result> { let metadata_region_id = utils::to_metadata_region_id(physical_region_id); diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index 284834a029..6bcc002908 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -16,6 +16,7 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, Row, SemanticType, Value}; +use common_telemetry::debug; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use mito2::config::MitoConfig; @@ -28,7 +29,7 @@ use store_api::metric_engine_consts::{ }; use store_api::region_engine::RegionEngine; use store_api::region_request::{ - AddColumn, AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest, + AddColumn, AlterKind, RegionAlterRequest, RegionCreateRequest, RegionOpenRequest, RegionRequest, }; use store_api::storage::{ColumnId, RegionId}; @@ -77,6 +78,34 @@ impl TestEnv { self.metric.clone() } + /// Creates a new follower engine with the same config as the leader engine. + pub async fn create_follower_engine(&mut self) -> (MitoEngine, MetricEngine) { + let mito = self + .mito_env + .create_follower_engine(MitoConfig::default()) + .await; + let metric = MetricEngine::new(mito.clone(), EngineConfig::default()); + + let region_id = self.default_physical_region_id(); + debug!("opening default physical region: {region_id}"); + let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())] + .into_iter() + .collect(); + metric + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: METRIC_ENGINE_NAME.to_string(), + region_dir: self.default_region_dir(), + options: physical_region_option, + skip_wal_replay: true, + }), + ) + .await + .unwrap(); + (mito, metric) + } + /// Create regions in [MetricEngine] under [`default_region_id`] /// and region dir `"test_metric_region"`. /// diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 110f79b875..7b3c7352da 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -82,7 +82,7 @@ use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, - RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, + RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, }; use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; @@ -496,7 +496,7 @@ impl EngineInner { &self, region_id: RegionId, manifest_info: RegionManifestInfo, - ) -> Result { + ) -> Result<(ManifestVersion, bool)> { ensure!(manifest_info.is_mito(), MitoManifestInfoSnafu); let manifest_version = manifest_info.data_manifest_version(); let (request, receiver) = @@ -631,12 +631,14 @@ impl RegionEngine for MitoEngine { &self, region_id: RegionId, manifest_info: RegionManifestInfo, - ) -> Result<(), BoxedError> { - self.inner + ) -> Result { + let (_, synced) = self + .inner .sync_region(region_id, manifest_info) .await - .map_err(BoxedError::new) - .map(|_| ()) + .map_err(BoxedError::new)?; + + Ok(SyncManifestResponse::Mito { synced }) } fn role(&self, region_id: RegionId) -> Option { diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index d60f018b4e..2590d7ae6c 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -313,11 +313,12 @@ impl RegionManifestManager { } ); + let region_id = self.manifest.metadata.region_id; // Fetches manifests from the last version strictly. let mut manifests = self .store // Invariant: last_version < target_version. - .fetch_manifests_strict_from(last_version + 1, target_version + 1) + .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id) .await?; // Case 2: No manifests in range: [current_version+1, target_version+1) @@ -327,7 +328,7 @@ impl RegionManifestManager { // [Current Version]......[Target Version] // [Follower region] if manifests.is_empty() { - debug!( + info!( "Manifests are not strict from {}, region: {}, tries to install the last checkpoint", last_version, self.manifest.metadata.region_id ); @@ -341,7 +342,7 @@ impl RegionManifestManager { manifests = self .store // Invariant: last_version < target_version. - .fetch_manifests_strict_from(last_version + 1, target_version + 1) + .fetch_manifests_strict_from(last_version + 1, target_version + 1, region_id) .await?; } diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index c0ee01ba60..89e23e2cd4 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -29,6 +29,7 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use store_api::manifest::ManifestVersion; +use store_api::storage::RegionId; use tokio::sync::Semaphore; use crate::error::{ @@ -243,12 +244,17 @@ impl ManifestObjectStore { &self, start_version: ManifestVersion, end_version: ManifestVersion, + region_id: RegionId, ) -> Result)>> { let mut manifests = self.fetch_manifests(start_version, end_version).await?; let start_index = manifests.iter().position(|(v, _)| *v == start_version); debug!( - "fetches manifests in range [{},{}), start_index: {:?}", - start_version, end_version, start_index + "Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}", + start_version, + end_version, + start_index, + region_id, + manifests.iter().map(|(v, _)| *v).collect::>() ); if let Some(start_index) = start_index { Ok(manifests.split_off(start_index)) diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 18ef260abe..33a8f13f07 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -692,7 +692,7 @@ impl WorkerRequest { pub(crate) fn new_sync_region_request( region_id: RegionId, manifest_version: ManifestVersion, - ) -> (WorkerRequest, Receiver>) { + ) -> (WorkerRequest, Receiver>) { let (sender, receiver) = oneshot::channel(); ( WorkerRequest::SyncRegion(RegionSyncRequest { @@ -892,7 +892,8 @@ pub(crate) struct RegionEditResult { pub(crate) struct RegionSyncRequest { pub(crate) region_id: RegionId, pub(crate) manifest_version: ManifestVersion, - pub(crate) sender: Sender>, + /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed. + pub(crate) sender: Sender>, } #[cfg(test)] diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index f1bec95514..4fd0de0d7b 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -136,6 +136,7 @@ impl RegionWorkerLoop { } }; + let original_manifest_version = region.manifest_ctx.manifest_version().await; let manifest = match region .manifest_ctx .install_manifest_to(request.manifest_version) @@ -173,7 +174,8 @@ impl RegionWorkerLoop { .build(); region.version_control.overwrite_current(Arc::new(version)); - let _ = sender.send(Ok(manifest.manifest_version)); + let updated = manifest.manifest_version > original_manifest_version; + let _ = sender.send(Ok((manifest.manifest_version, updated))); } } diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs index 72e4ad093a..2b3b473770 100644 --- a/src/query/src/optimizer/test_util.rs +++ b/src/query/src/optimizer/test_util.rs @@ -29,7 +29,7 @@ use store_api::metadata::{ }; use store_api::region_engine::{ RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, - SetRegionRoleStateResponse, SettableRegionRoleState, + SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, }; use store_api::region_request::RegionRequest; use store_api::storage::{ConcreteDataType, RegionId, ScanRequest, SequenceNumber}; @@ -113,7 +113,7 @@ impl RegionEngine for MetaRegionEngine { &self, _region_id: RegionId, _manifest_info: RegionManifestInfo, - ) -> Result<(), BoxedError> { + ) -> Result { unimplemented!() } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 0a38700f1d..8522a2e1ca 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -583,6 +583,62 @@ impl RegionStatistic { } } +/// The response of syncing the manifest. +#[derive(Debug)] +pub enum SyncManifestResponse { + NotSupported, + Mito { + /// Indicates if the data region was synced. + synced: bool, + }, + Metric { + /// Indicates if the metadata region was synced. + metadata_synced: bool, + /// Indicates if the data region was synced. + data_synced: bool, + /// The logical regions that were newly opened during the sync operation. + /// This only occurs after the metadata region has been successfully synced. + new_opened_logical_region_ids: Vec, + }, +} + +impl SyncManifestResponse { + /// Returns true if data region is synced. + pub fn is_data_synced(&self) -> bool { + match self { + SyncManifestResponse::NotSupported => false, + SyncManifestResponse::Mito { synced } => *synced, + SyncManifestResponse::Metric { data_synced, .. } => *data_synced, + } + } + + /// Returns true if the engine is supported the sync operation. + pub fn is_supported(&self) -> bool { + matches!(self, SyncManifestResponse::NotSupported) + } + + /// Returns true if the engine is a mito2 engine. + pub fn is_mito(&self) -> bool { + matches!(self, SyncManifestResponse::Mito { .. }) + } + + /// Returns true if the engine is a metric engine. + pub fn is_metric(&self) -> bool { + matches!(self, SyncManifestResponse::Metric { .. }) + } + + /// Returns the new opened logical region ids. + pub fn new_opened_logical_region_ids(self) -> Option> { + match self { + SyncManifestResponse::Metric { + new_opened_logical_region_ids, + .. + } => Some(new_opened_logical_region_ids), + _ => None, + } + } +} + #[async_trait] pub trait RegionEngine: Send + Sync { /// Name of this engine @@ -689,7 +745,7 @@ pub trait RegionEngine: Send + Sync { &self, region_id: RegionId, manifest_info: RegionManifestInfo, - ) -> Result<(), BoxedError>; + ) -> Result; /// Sets region role state gracefully. ///