From 41aee1f1b7f4836a9ad89284bfbd56a87ecfc131 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 31 Mar 2025 11:53:47 +0800 Subject: [PATCH] feat: implement `sync_region` for mito engine (#5765) * chore: upgrade proto to `2d52b` * feat: add `SyncRegion` to `WorkerRequest` * feat: impl `sync_region` for `Engine` trait * test: add tests * chore: fmt code * chore: upgrade proto * chore: unify `RegionLeaderState` and `RegionFollowerState` * chore: check immutable memtable * chore: fix clippy * chore: apply suggestions from CR --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/datanode/src/alive_keeper.rs | 10 +- src/datanode/src/tests.rs | 4 + src/file-engine/src/engine.rs | 10 + .../src/handler/region_lease_handler.rs | 2 + src/metric-engine/src/engine.rs | 12 +- src/metric-engine/src/error.rs | 12 +- src/mito2/src/engine.rs | 27 ++ src/mito2/src/engine/sync_test.rs | 235 ++++++++++++++++++ src/mito2/src/error.rs | 12 +- src/mito2/src/region.rs | 52 +++- src/mito2/src/region/version.rs | 6 + src/mito2/src/request.rs | 26 ++ src/mito2/src/worker.rs | 4 + src/mito2/src/worker/handle_catchup.rs | 63 +++-- src/mito2/src/worker/handle_manifest.rs | 59 ++++- src/mito2/src/worker/handle_write.rs | 6 +- src/query/src/optimizer/test_util.rs | 9 + src/store-api/src/region_engine.rs | 13 + 20 files changed, 513 insertions(+), 53 deletions(-) create mode 100644 src/mito2/src/engine/sync_test.rs diff --git a/Cargo.lock b/Cargo.lock index 6a0fefc6ac..2e7fc8305c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4671,7 +4671,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=97e298d119fdb9499bc6ba9e03f375cfa7cdf130#97e298d119fdb9499bc6ba9e03f375cfa7cdf130" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5bf34f1ba22763bfd4ab2ed1dd82fc790746048a#5bf34f1ba22763bfd4ab2ed1dd82fc790746048a" dependencies = [ "prost 0.13.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index 54ac9e2341..00539c5212 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,7 +129,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "97e298d119fdb9499bc6ba9e03f375cfa7cdf130" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5bf34f1ba22763bfd4ab2ed1dd82fc790746048a" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index bf9cb16f6b..eff1d31502 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -380,7 +380,9 @@ impl CountdownTask { } }, Some(CountdownCommand::Reset((role, deadline))) => { - let _ = self.region_server.set_region_role(self.region_id, role); + if let Err(err) = self.region_server.set_region_role(self.region_id, role) { + error!(err; "Failed to set region role to {role} for region {region_id}"); + } trace!( "Reset deadline of region {region_id} to approximately {} seconds later.", (deadline - Instant::now()).as_secs_f32(), @@ -402,7 +404,9 @@ impl CountdownTask { } () = &mut countdown => { warn!("The region {region_id} lease is expired, convert region to follower."); - let _ = self.region_server.set_region_role(self.region_id, RegionRole::Follower); + if let Err(err) = self.region_server.set_region_role(self.region_id, RegionRole::Follower) { + error!(err; "Failed to set region role to follower for region {region_id}"); + } // resets the countdown. let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30); countdown.as_mut().reset(far_future); @@ -468,6 +472,8 @@ mod test { &[GrantedRegion { region_id: region_id.as_u64(), role: api::v1::meta::RegionRole::Leader.into(), + // TODO(weny): use real manifest version + manifest_version: 0, }], Instant::now() + Duration::from_millis(200), ) diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 8e8878fa79..cd911f49aa 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -246,6 +246,10 @@ impl RegionEngine for MockRegionEngine { Some(RegionRole::Leader) } + async fn sync_region(&self, _region_id: RegionId, _version: u64) -> Result<(), BoxedError> { + unimplemented!() + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 9bf4432379..77c935b2b3 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -24,6 +24,7 @@ use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{error, info}; use object_store::ObjectStore; use snafu::{ensure, OptionExt}; +use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, @@ -138,6 +139,15 @@ impl RegionEngine for FileRegionEngine { } } + async fn sync_region( + &self, + _region_id: RegionId, + _manifest_version: ManifestVersion, + ) -> Result<(), BoxedError> { + // File engine doesn't need to sync region manifest. + Ok(()) + } + fn role(&self, region_id: RegionId) -> Option { self.inner.state(region_id) } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 64ec1f01e4..7312659d36 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -80,6 +80,8 @@ impl HeartbeatHandler for RegionLeaseHandler { GrantedRegion { region_id, region_role, + // TODO(weny): use real manifest version + manifest_version: 0, } .into() }) diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 48982c2b1b..d678136a34 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -36,6 +36,7 @@ use common_error::status_code::StatusCode; use mito2::engine::MitoEngine; pub(crate) use options::IndexOptions; use snafu::ResultExt; +use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; use store_api::region_engine::{ @@ -48,7 +49,7 @@ use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; use self::state::MetricEngineState; use crate::config::EngineConfig; use crate::data_region::DataRegion; -use crate::error::{self, Result, UnsupportedRegionRequestSnafu}; +use crate::error::{self, Result, UnsupportedRegionRequestSnafu, UnsupportedSyncRegionSnafu}; use crate::metadata_region::MetadataRegion; use crate::row_modifier::RowModifier; use crate::utils; @@ -285,6 +286,15 @@ impl RegionEngine for MetricEngine { Ok(()) } + async fn sync_region( + &self, + _region_id: RegionId, + _manifest_version: ManifestVersion, + ) -> Result<(), BoxedError> { + // TODO(weny): implement it later. + Err(BoxedError::new(UnsupportedSyncRegionSnafu {}.build())) + } + async fn set_region_role_state_gracefully( &self, region_id: RegionId, diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index de0d935ee0..cd321a8841 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -259,6 +259,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Unsupported sync region request"))] + UnsupportedSyncRegion { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -280,9 +286,9 @@ impl ErrorExt for Error { | UnexpectedRequest { .. } | UnsupportedAlterKind { .. } => StatusCode::InvalidArguments, - ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => { - StatusCode::Unsupported - } + ForbiddenPhysicalAlter { .. } + | UnsupportedRegionRequest { .. } + | UnsupportedSyncRegion { .. } => StatusCode::Unsupported, DeserializeColumnMetadata { .. } | SerializeColumnMetadata { .. } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 63f8e0d794..6203f5884a 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -55,6 +55,8 @@ mod row_selector_test; #[cfg(test)] mod set_role_state_test; #[cfg(test)] +mod sync_test; +#[cfg(test)] mod truncate_test; use std::any::Any; @@ -76,6 +78,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use store_api::codec::PrimaryKeyEncoding; use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; +use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ BatchResponses, RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, @@ -488,6 +491,18 @@ impl EngineInner { receiver.await.context(RecvSnafu) } + async fn sync_region( + &self, + region_id: RegionId, + manifest_version: ManifestVersion, + ) -> Result { + let (request, receiver) = + WorkerRequest::new_sync_region_request(region_id, manifest_version); + self.workers.submit_to_worker(region_id, request).await?; + + receiver.await.context(RecvSnafu)? + } + fn role(&self, region_id: RegionId) -> Option { self.workers.get_region(region_id).map(|region| { if region.is_follower() { @@ -609,6 +624,18 @@ impl RegionEngine for MitoEngine { .map_err(BoxedError::new) } + async fn sync_region( + &self, + region_id: RegionId, + manifest_version: ManifestVersion, + ) -> Result<(), BoxedError> { + self.inner + .sync_region(region_id, manifest_version) + .await + .map_err(BoxedError::new) + .map(|_| ()) + } + fn role(&self, region_id: RegionId) -> Option { self.inner.role(region_id) } diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs new file mode 100644 index 0000000000..44f7e33c4e --- /dev/null +++ b/src/mito2/src/engine/sync_test.rs @@ -0,0 +1,235 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; + +use api::v1::{Rows, SemanticType}; +use common_error::ext::ErrorExt; +use common_recordbatch::RecordBatches; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use store_api::metadata::ColumnMetadata; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{ + AddColumn, AddColumnLocation, AlterKind, RegionAlterRequest, RegionOpenRequest, RegionRequest, +}; +use store_api::storage::{RegionId, ScanRequest}; + +use super::MitoEngine; +use crate::config::MitoConfig; +use crate::error::Error; +use crate::test_util::{ + build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, +}; + +fn add_tag1() -> RegionAlterRequest { + RegionAlterRequest { + schema_version: 0, + kind: AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1", + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 3, + }, + location: Some(AddColumnLocation::First), + }], + }, + } +} + +async fn scan_check( + engine: &MitoEngine, + region_id: RegionId, + expected: &str, + num_memtable: usize, + num_files: usize, +) { + let request = ScanRequest::default(); + let scanner = engine.scanner(region_id, request).unwrap(); + assert_eq!(num_memtable, scanner.num_memtables()); + assert_eq!(num_files, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_sync_after_flush_region() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + let region_id = RegionId::new(1, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new().build(); + let region_dir = request.region_dir.clone(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Open the region on the follower engine + let follower_engine = env.create_follower_engine(MitoConfig::default()).await; + follower_engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: Default::default(), + // Ensure the region is not replayed from the WAL. + skip_wal_replay: true, + }), + ) + .await + .unwrap(); + + flush_region(&engine, region_id, None).await; + + // Scan the region on the leader engine + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + scan_check(&engine, region_id, expected, 0, 1).await; + + common_telemetry::info!("Scan the region on the follower engine"); + // Scan the region on the follower engine + let expected = "++\n++"; + scan_check(&follower_engine, region_id, expected, 0, 0).await; + + // Returns error since the max manifest is 1 + let err = follower_engine.sync_region(region_id, 2).await.unwrap_err(); + let err = err.as_any().downcast_ref::().unwrap(); + assert_matches!(err, Error::InstallManifestTo { .. }); + + follower_engine.sync_region(region_id, 1).await.unwrap(); + common_telemetry::info!("Scan the region on the follower engine after sync"); + // Scan the region on the follower engine + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + scan_check(&follower_engine, region_id, expected, 0, 1).await; +} + +#[tokio::test] +async fn test_sync_after_alter_region() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let column_schemas = rows_schema(&request); + let region_dir = request.region_dir.clone(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Open the region on the follower engine + let follower_engine = env.create_follower_engine(MitoConfig::default()).await; + follower_engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + region_dir, + options: Default::default(), + // Ensure the region is not replayed from the WAL. + skip_wal_replay: true, + }), + ) + .await + .unwrap(); + + let request = add_tag1(); + engine + .handle_request(region_id, RegionRequest::Alter(request)) + .await + .unwrap(); + + let expected = "\ ++-------+-------+---------+---------------------+ +| tag_1 | tag_0 | field_0 | ts | ++-------+-------+---------+---------------------+ +| | 0 | 0.0 | 1970-01-01T00:00:00 | +| | 1 | 1.0 | 1970-01-01T00:00:01 | +| | 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+-------+---------+---------------------+"; + + scan_check(&engine, region_id, expected, 0, 1).await; + let expected = "++\n++"; + scan_check(&follower_engine, region_id, expected, 0, 0).await; + + // Sync the region from the leader engine to the follower engine + follower_engine.sync_region(region_id, 2).await.unwrap(); + let expected = "\ ++-------+-------+---------+---------------------+ +| tag_1 | tag_0 | field_0 | ts | ++-------+-------+---------+---------------------+ +| | 0 | 0.0 | 1970-01-01T00:00:00 | +| | 1 | 1.0 | 1970-01-01T00:00:01 | +| | 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+-------+---------+---------------------+"; + scan_check(&follower_engine, region_id, expected, 0, 1).await; +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 5df76f1e5f..7bcb58e45c 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -33,7 +33,7 @@ use store_api::storage::RegionId; use tokio::time::error::Elapsed; use crate::cache::file_cache::FileType; -use crate::region::{RegionLeaderState, RegionRoleState}; +use crate::region::RegionRoleState; use crate::schedule::remote_job_scheduler::JobId; use crate::sst::file::FileId; use crate::worker::WorkerId; @@ -511,10 +511,10 @@ pub enum Error { }, #[snafu(display("Region {} is in {:?} state, expect: {:?}", region_id, state, expect))] - RegionLeaderState { + RegionState { region_id: RegionId, state: RegionRoleState, - expect: RegionLeaderState, + expect: RegionRoleState, #[snafu(implicit)] location: Location, }, @@ -812,8 +812,8 @@ pub enum Error { #[snafu(display( "Failed to install manifest to {}, region: {}, available manifest version: {}, last version: {}", target_version, - available_version, region_id, + available_version, last_version ))] InstallManifestTo { @@ -1125,8 +1125,8 @@ impl ErrorExt for Error { CompactRegion { source, .. } => source.status_code(), CompatReader { .. } => StatusCode::Unexpected, InvalidRegionRequest { source, .. } => source.status_code(), - RegionLeaderState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady, - &FlushableRegionState { .. } => StatusCode::RegionNotReady, + RegionState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady, + FlushableRegionState { .. } => StatusCode::RegionNotReady, JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index a2b55bc7b1..d44f66a4b1 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -35,10 +35,10 @@ use store_api::storage::RegionId; use crate::access_layer::AccessLayerRef; use crate::error::{ - FlushableRegionStateSnafu, RegionLeaderStateSnafu, RegionNotFoundSnafu, RegionTruncatedSnafu, - Result, UpdateManifestSnafu, + FlushableRegionStateSnafu, RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result, + UpdateManifestSnafu, }; -use crate::manifest::action::{RegionMetaAction, RegionMetaActionList}; +use crate::manifest::action::{RegionManifest, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::RegionManifestManager; use crate::memtable::MemtableBuilderRef; use crate::region::version::{VersionControlRef, VersionRef}; @@ -317,10 +317,10 @@ impl MitoRegion { .state .compare_exchange(RegionRoleState::Leader(expect), state) .map_err(|actual| { - RegionLeaderStateSnafu { + RegionStateSnafu { region_id: self.region_id, state: actual, - expect, + expect: RegionRoleState::Leader(expect), } .build() })?; @@ -358,6 +358,21 @@ impl ManifestContext { self.manifest_manager.read().await.has_update().await } + /// Installs the manifest changes from the current version to the target version (inclusive). + /// + /// Returns installed [RegionManifest]. + /// **Note**: This function is not guaranteed to install the target version strictly. + /// The installed version may be greater than the target version. + pub(crate) async fn install_manifest_to( + &self, + version: ManifestVersion, + ) -> Result> { + let mut manager = self.manifest_manager.write().await; + manager.install_manifest_to(version).await?; + + Ok(manager.manifest()) + } + /// Updates the manifest if current state is `expect_state`. pub(crate) async fn update_manifest( &self, @@ -394,10 +409,10 @@ impl ManifestContext { } else { ensure!( current_state == RegionRoleState::Leader(expect_state), - RegionLeaderStateSnafu { + RegionStateSnafu { region_id: manifest.metadata.region_id, state: current_state, - expect: expect_state, + expect: RegionRoleState::Leader(expect_state), } ); } @@ -589,15 +604,34 @@ impl RegionMap { .context(RegionNotFoundSnafu { region_id })?; ensure!( region.is_writable(), - RegionLeaderStateSnafu { + RegionStateSnafu { region_id, state: region.state(), - expect: RegionLeaderState::Writable, + expect: RegionRoleState::Leader(RegionLeaderState::Writable), } ); Ok(region) } + /// Gets readonly region by region id. + /// + /// Returns error if the region does not exist or is writable. + pub(crate) fn follower_region(&self, region_id: RegionId) -> Result { + let region = self + .get_region(region_id) + .context(RegionNotFoundSnafu { region_id })?; + ensure!( + region.is_follower(), + RegionStateSnafu { + region_id, + state: region.state(), + expect: RegionRoleState::Follower, + } + ); + + Ok(region) + } + /// Gets region by region id. /// /// Calls the callback if the region does not exist. diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index c27f385ef7..d5078933bc 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -216,6 +216,12 @@ impl VersionControl { version_data.version.ssts.mark_all_deleted(); version_data.version = new_version; } + + /// Overwrites the current version with a new version. + pub(crate) fn overwrite_current(&self, version: VersionRef) { + let mut version_data = self.data.write().unwrap(); + version_data.version = version; + } } pub(crate) type VersionControlRef = Arc; diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index b74db28b84..18ef260abe 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -31,6 +31,7 @@ use prost::Message; use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding}; +use store_api::manifest::ManifestVersion; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState}; use store_api::region_request::{ @@ -565,6 +566,9 @@ pub(crate) enum WorkerRequest { /// Use [RegionEdit] to edit a region directly. EditRegion(RegionEditRequest), + + /// Keep the manifest of a region up to date. + SyncRegion(RegionSyncRequest), } impl WorkerRequest { @@ -684,6 +688,21 @@ impl WorkerRequest { receiver, ) } + + pub(crate) fn new_sync_region_request( + region_id: RegionId, + manifest_version: ManifestVersion, + ) -> (WorkerRequest, Receiver>) { + let (sender, receiver) = oneshot::channel(); + ( + WorkerRequest::SyncRegion(RegionSyncRequest { + region_id, + manifest_version, + sender, + }), + receiver, + ) + } } /// DDL request to a region. @@ -869,6 +888,13 @@ pub(crate) struct RegionEditResult { pub(crate) result: Result<()>, } +#[derive(Debug)] +pub(crate) struct RegionSyncRequest { + pub(crate) region_id: RegionId, + pub(crate) manifest_version: ManifestVersion, + pub(crate) sender: Sender>, +} + #[cfg(test)] mod tests { use api::v1::value::ValueData; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 87b51fffdf..0eb5abff41 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -824,6 +824,10 @@ impl RegionWorkerLoop { WorkerRequest::Stop => { debug_assert!(!self.running.load(Ordering::Relaxed)); } + + WorkerRequest::SyncRegion(req) => { + self.handle_region_sync(req).await; + } } } diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 8992621dd7..cc7be0f9a3 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -27,6 +27,7 @@ use tokio::time::Instant; use crate::error::{self, Result}; use crate::region::opener::{replay_memtable, RegionOpener}; +use crate::region::MitoRegion; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -45,34 +46,12 @@ impl RegionWorkerLoop { } // Note: Currently, We protect the split brain by ensuring the mutable table is empty. // It's expensive to execute catch-up requests without `set_writable=true` multiple times. - let is_mutable_empty = region.version().memtables.mutable.is_empty(); + let version = region.version(); + let is_empty_memtable = version.memtables.is_empty(); // Utilizes the short circuit evaluation. - let region = if !is_mutable_empty || region.manifest_ctx.has_update().await? { - let manifest_version = region.manifest_ctx.manifest_version().await; - let flushed_entry_id = region.version_control.current().last_entry_id; - info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}"); - let reopened_region = Arc::new( - RegionOpener::new( - region_id, - region.region_dir(), - self.memtable_builder_provider.clone(), - self.object_store_manager.clone(), - self.purge_scheduler.clone(), - self.puffin_manager_factory.clone(), - self.intermediate_manager.clone(), - self.time_provider.clone(), - ) - .cache(Some(self.cache_manager.clone())) - .options(region.version().options.clone())? - .skip_wal_replay(true) - .open(&self.config, &self.wal) - .await?, - ); - debug_assert!(!reopened_region.is_writable()); - self.regions.insert_region(reopened_region.clone()); - - reopened_region + let region = if !is_empty_memtable || region.manifest_ctx.has_update().await? { + self.reopen_region(®ion).await? } else { region }; @@ -124,4 +103,36 @@ impl RegionWorkerLoop { Ok(0) } + + /// Reopens a region. + pub(crate) async fn reopen_region( + &mut self, + region: &Arc, + ) -> Result> { + let region_id = region.region_id; + let manifest_version = region.manifest_ctx.manifest_version().await; + let flushed_entry_id = region.version_control.current().last_entry_id; + info!("Reopening the region: {region_id}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}"); + let reopened_region = Arc::new( + RegionOpener::new( + region_id, + region.region_dir(), + self.memtable_builder_provider.clone(), + self.object_store_manager.clone(), + self.purge_scheduler.clone(), + self.puffin_manager_factory.clone(), + self.intermediate_manager.clone(), + self.time_provider.clone(), + ) + .cache(Some(self.cache_manager.clone())) + .options(region.version().options.clone())? + .skip_wal_replay(true) + .open(&self.config, &self.wal) + .await?, + ); + debug_assert!(!reopened_region.is_writable()); + self.regions.insert_region(reopened_region.clone()); + + Ok(reopened_region) + } } diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index c79e15b39e..f1bec95514 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -17,6 +17,7 @@ //! It updates the manifest and applies the changes to the region in background. use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; use common_telemetry::{info, warn}; use store_api::logstore::LogStore; @@ -29,10 +30,11 @@ use crate::manifest::action::{ RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate, }; use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD; +use crate::region::version::VersionBuilder; use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState}; use crate::request::{ BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult, - TruncateResult, WorkerRequest, + RegionSyncRequest, TruncateResult, WorkerRequest, }; use crate::sst::location; use crate::worker::{RegionWorkerLoop, WorkerListener}; @@ -118,6 +120,61 @@ impl RegionWorkerLoop { self.handle_region_stalled_requests(&change_result.region_id) .await; } + + /// Handles region sync request. + /// + /// Updates the manifest to at least the given version. + /// **Note**: The installed version may be greater than the given version. + pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) { + let region_id = request.region_id; + let sender = request.sender; + let region = match self.regions.follower_region(region_id) { + Ok(region) => region, + Err(e) => { + let _ = sender.send(Err(e)); + return; + } + }; + + let manifest = match region + .manifest_ctx + .install_manifest_to(request.manifest_version) + .await + { + Ok(manifest) => manifest, + Err(e) => { + let _ = sender.send(Err(e)); + return; + } + }; + let version = region.version(); + if !version.memtables.is_empty() { + warn!( + "Region {} memtables is not empty, which should not happen, manifest version: {}", + region.region_id, manifest.manifest_version + ); + } + let region_options = version.options.clone(); + let new_mutable = Arc::new( + region + .version() + .memtables + .mutable + .new_with_part_duration(version.compaction_time_window), + ); + let metadata = manifest.metadata.clone(); + let version = VersionBuilder::new(metadata, new_mutable) + .add_files(region.file_purger.clone(), manifest.files.values().cloned()) + .flushed_entry_id(manifest.flushed_entry_id) + .flushed_sequence(manifest.flushed_sequence) + .truncated_entry_id(manifest.truncated_entry_id) + .compaction_time_window(manifest.compaction_time_window) + .options(region_options) + .build(); + region.version_control.overwrite_current(Arc::new(version)); + + let _ = sender.send(Ok(manifest.manifest_version)); + } } impl RegionWorkerLoop { diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 46396a6973..a5dafcdd3a 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -25,7 +25,7 @@ use store_api::codec::PrimaryKeyEncoding; use store_api::logstore::LogStore; use store_api::storage::RegionId; -use crate::error::{InvalidRequestSnafu, RegionLeaderStateSnafu, RejectWriteSnafu, Result}; +use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result}; use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED}; use crate::region::{RegionLeaderState, RegionRoleState}; use crate::region_write_ctx::RegionWriteCtx; @@ -240,10 +240,10 @@ impl RegionWorkerLoop { state => { // The region is not writable. sender_req.sender.send( - RegionLeaderStateSnafu { + RegionStateSnafu { region_id, state, - expect: RegionLeaderState::Writable, + expect: RegionRoleState::Leader(RegionLeaderState::Writable), } .fail(), ); diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs index 25de1c8a16..a02d2ead5a 100644 --- a/src/query/src/optimizer/test_util.rs +++ b/src/query/src/optimizer/test_util.rs @@ -24,6 +24,7 @@ use async_trait::async_trait; use common_error::ext::{BoxedError, PlainError}; use common_error::status_code::StatusCode; use datatypes::schema::ColumnSchema; +use store_api::manifest::ManifestVersion; use store_api::metadata::{ ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, }; @@ -109,6 +110,14 @@ impl RegionEngine for MetaRegionEngine { unimplemented!() } + async fn sync_region( + &self, + _region_id: RegionId, + _manifest_version: ManifestVersion, + ) -> Result<(), BoxedError> { + unimplemented!() + } + fn role(&self, _region_id: RegionId) -> Option { None } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 1dc161b228..ed1d0e4809 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -34,6 +34,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; use crate::logstore::entry; +use crate::manifest::ManifestVersion; use crate::metadata::RegionMetadataRef; use crate::region_request::{ BatchRegionDdlRequest, RegionOpenRequest, RegionRequest, RegionSequencesRequest, @@ -84,6 +85,7 @@ impl SetRegionRoleStateResponse { pub struct GrantedRegion { pub region_id: RegionId, pub region_role: RegionRole, + pub manifest_version: u64, } impl GrantedRegion { @@ -91,6 +93,8 @@ impl GrantedRegion { Self { region_id, region_role, + // TODO(weny): use real manifest version + manifest_version: 0, } } } @@ -100,6 +104,7 @@ impl From for PbGrantedRegion { PbGrantedRegion { region_id: value.region_id.as_u64(), role: PbRegionRole::from(value.region_role).into(), + manifest_version: value.manifest_version, } } } @@ -109,6 +114,7 @@ impl From for GrantedRegion { GrantedRegion { region_id: RegionId::from_u64(value.region_id), region_role: value.role().into(), + manifest_version: value.manifest_version, } } } @@ -503,6 +509,13 @@ pub trait RegionEngine: Send + Sync { /// take effect. fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>; + /// Syncs the region manifest to the given manifest version. + async fn sync_region( + &self, + region_id: RegionId, + manifest_version: ManifestVersion, + ) -> Result<(), BoxedError>; + /// Sets region role state gracefully. /// /// After the call returns, the engine ensures no more write operations will succeed in the region.