From 57f19212534b2329b1418fd4297b3acf58287eb4 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 13 Apr 2026 17:04:02 +0800 Subject: [PATCH] feat: propagate staging leader through lease and heartbeat (#7950) * feat(mito): expose staging leader role state * fix(region): clear staging metadata on leader exit * feat: propagate staging leader role through heartbeat and metasrv * chore: update comments Signed-off-by: WenyXu * fix(region): unify staging exit role transitions * chore: update proto Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 3 +- Cargo.toml | 2 +- src/common/meta/src/datanode.rs | 23 ++ src/datanode/src/alive_keeper.rs | 138 +++++++++ src/datanode/src/heartbeat.rs | 6 +- src/datanode/src/region_server.rs | 1 + .../handler/collect_cluster_info_handler.rs | 2 +- .../handler/collect_leader_region_handler.rs | 2 +- .../src/handler/persist_stats_handler.rs | 5 +- .../src/handler/region_lease_handler.rs | 59 ++++ src/meta-srv/src/region/lease_keeper.rs | 124 +++++++- src/mito2/src/engine.rs | 10 +- .../src/engine/apply_staging_manifest_test.rs | 2 +- src/mito2/src/engine/set_role_state_test.rs | 266 +++++++++++++++++- src/mito2/src/engine/staging_test.rs | 2 +- src/mito2/src/region.rs | 173 +++++++++--- src/mito2/src/region/opener.rs | 5 +- src/mito2/src/worker/handle_apply_staging.rs | 2 +- src/mito2/src/worker/handle_enter_staging.rs | 10 +- src/store-api/src/region_engine.rs | 12 +- 20 files changed, 782 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 872095752b..4f6339d83a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5681,7 +5681,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=092ba1d01e2da676dca66cca7eebb55009da8ef8#092ba1d01e2da676dca66cca7eebb55009da8ef8" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=26a50f4069f50c37d65b45e0d39ae0cb42de5425#26a50f4069f50c37d65b45e0d39ae0cb42de5425" dependencies = [ "prost 0.14.1", "prost-types 0.14.1", @@ -5691,7 +5691,6 @@ dependencies = [ "strum_macros 0.25.3", "tonic 0.14.2", "tonic-prost", - "tonic-prost-build", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 227608bf64..34e10d9173 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,7 +154,7 @@ etcd-client = { version = "0.17", features = [ fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "092ba1d01e2da676dca66cca7eebb55009da8ef8" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "26a50f4069f50c37d65b45e0d39ae0cb42de5425" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs index 8b521d8e43..d6c6229801 100644 --- a/src/common/meta/src/datanode.rs +++ b/src/common/meta/src/datanode.rs @@ -573,4 +573,27 @@ mod tests { let region_num = stat_val.region_num().unwrap(); assert_eq!(2, region_num); } + + #[test] + fn test_region_stat_from_heartbeat_preserves_staging_leader_role() { + let request = HeartbeatRequest { + header: Some(RequestHeader::default()), + peer: Some(api::v1::meta::Peer { + id: 1, + addr: "127.0.0.1:3001".to_string(), + }), + region_stats: vec![api::v1::meta::RegionStat { + region_id: RegionId::new(1024, 1).as_u64(), + engine: "mito".to_string(), + role: api::v1::meta::RegionRole::StagingLeader.into(), + ..Default::default() + }], + ..Default::default() + }; + + let stat = Stat::try_from(&request).unwrap(); + + assert_eq!(stat.region_stats.len(), 1); + assert_eq!(stat.region_stats[0].role, RegionRole::StagingLeader); + } } diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index 57f4e00aa2..dbf99fdb28 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -503,6 +503,7 @@ mod test { use mito2::config::MitoConfig; use mito2::test_util::{CreateRequestBuilder, TestEnv}; use store_api::region_engine::RegionEngine; + use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective}; use super::*; use crate::tests::mock_region_server; @@ -621,4 +622,141 @@ mod test { > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4) ); } + + #[tokio::test(flavor = "multi_thread")] + async fn renew_staging_leader_keeps_region_in_staging() { + let mut region_server = mock_region_server(); + let mut engine_env = TestEnv::with_prefix("region-alive-keeper-staging").await; + let engine = engine_env.create_engine(MitoConfig::default()).await; + let engine = Arc::new(engine); + region_server.register_engine(engine.clone()); + + let alive_keeper = Arc::new(RegionAliveKeeper::new( + region_server.clone(), + None, + Duration::from_millis(100), + )); + + let region_id = RegionId::new(1024, 2); + region_server + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + region_server + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_directive: StagingPartitionDirective::RejectAllWrites, + }), + ) + .await + .unwrap(); + + alive_keeper.register_region(region_id).await; + alive_keeper + .renew_region_leases( + &[GrantedRegion { + region_id: region_id.as_u64(), + role: api::v1::meta::RegionRole::StagingLeader.into(), + extensions: HashMap::new(), + }], + Instant::now() + Duration::from_millis(3000), + ) + .await; + + assert_eq!(engine.role(region_id).unwrap(), RegionRole::StagingLeader); + } + + #[tokio::test(flavor = "multi_thread")] + async fn renew_staging_leader_exit_into_leader() { + common_telemetry::init_default_ut_logging(); + let mut region_server = mock_region_server(); + let mut engine_env = TestEnv::with_prefix("region-alive-keeper-staging-exit").await; + let engine = engine_env.create_engine(MitoConfig::default()).await; + let engine = Arc::new(engine); + region_server.register_engine(engine.clone()); + + let alive_keeper = Arc::new(RegionAliveKeeper::new( + region_server.clone(), + None, + Duration::from_millis(100), + )); + + let region_id = RegionId::new(1024, 2); + region_server + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + region_server + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_directive: StagingPartitionDirective::RejectAllWrites, + }), + ) + .await + .unwrap(); + + alive_keeper.register_region(region_id).await; + alive_keeper + .renew_region_leases( + &[GrantedRegion { + region_id: region_id.as_u64(), + role: api::v1::meta::RegionRole::Leader.into(), + extensions: HashMap::new(), + }], + Instant::now() + Duration::from_millis(3000), + ) + .await; + + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader); + } + + #[tokio::test(flavor = "multi_thread")] + async fn renew_staging_leader_does_not_promote_normal_leader_into_staging() { + let mut region_server = mock_region_server(); + let mut engine_env = TestEnv::with_prefix("region-alive-keeper-non-staging").await; + let engine = engine_env.create_engine(MitoConfig::default()).await; + let engine = Arc::new(engine); + region_server.register_engine(engine.clone()); + + let alive_keeper = Arc::new(RegionAliveKeeper::new( + region_server.clone(), + None, + Duration::from_millis(100), + )); + + let region_id = RegionId::new(1024, 4); + region_server + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + region_server + .set_region_role(region_id, RegionRole::Leader) + .unwrap(); + + alive_keeper.register_region(region_id).await; + alive_keeper + .renew_region_leases( + &[GrantedRegion { + region_id: region_id.as_u64(), + role: api::v1::meta::RegionRole::StagingLeader.into(), + extensions: HashMap::new(), + }], + Instant::now() + Duration::from_millis(3000), + ) + .await; + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader); + } } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index be662dfe94..fe8866b7f9 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -148,9 +148,9 @@ impl HeartbeatTask { let mut follower_region_lease_count = 0; for lease in &lease.regions { match lease.role() { - RegionRole::Leader | RegionRole::DowngradingLeader => { - leader_region_lease_count += 1 - } + RegionRole::Leader + | RegionRole::StagingLeader + | RegionRole::DowngradingLeader => leader_region_lease_count += 1, RegionRole::Follower => follower_region_lease_count += 1, } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index ec10691bea..aa3ffbfe3a 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -360,6 +360,7 @@ impl RegionServer { engine.role(region_id).map(|role| match role { RegionRole::Follower => false, RegionRole::Leader => true, + RegionRole::StagingLeader => true, RegionRole::DowngradingLeader => true, }) }) diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs index c96229f9cf..3fc785a1cb 100644 --- a/src/meta-srv/src/handler/collect_cluster_info_handler.rs +++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs @@ -129,7 +129,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { let leader_regions = stat .region_stats .iter() - .filter(|s| s.role == RegionRole::Leader) + .filter(|s| matches!(s.role, RegionRole::Leader | RegionRole::StagingLeader)) .count(); let follower_regions = stat.region_stats.len() - leader_regions; diff --git a/src/meta-srv/src/handler/collect_leader_region_handler.rs b/src/meta-srv/src/handler/collect_leader_region_handler.rs index ddb4cd0ea3..95b03e3341 100644 --- a/src/meta-srv/src/handler/collect_leader_region_handler.rs +++ b/src/meta-srv/src/handler/collect_leader_region_handler.rs @@ -40,7 +40,7 @@ impl HeartbeatHandler for CollectLeaderRegionHandler { let mut key_values = Vec::with_capacity(current_stat.region_stats.len()); for stat in current_stat.region_stats.iter() { - if stat.role != RegionRole::Leader { + if !matches!(stat.role, RegionRole::Leader | RegionRole::StagingLeader) { continue; } diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 75281f982a..d863070225 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -121,7 +121,10 @@ fn to_persisted_if_leader( datanode_id: DatanodeId, timestamp_millis: i64, ) -> Option<(Row, PersistedRegionStat)> { - if matches!(region_stat.role, RegionRole::Leader) { + if matches!( + region_stat.role, + RegionRole::Leader | RegionRole::StagingLeader + ) { let persisted_region_stat = last_persisted_region_stats.get(®ion_stat.id).map(|s| *s); Some(( compute_persist_region_stat( diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index fef84ef0db..c6c1d44521 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -398,6 +398,65 @@ mod test { assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id])); } + #[tokio::test] + async fn test_handle_staging_leader() { + let datanode_id = 1; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let peer = Peer::empty(datanode_id); + let table_info = new_test_table_info(table_id); + + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + leader_state: Some(LeaderState::Staging), + ..Default::default() + }]; + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + + table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::physical(region_routes), + HashMap::default(), + ) + .await + .unwrap(); + + let builder = MetasrvBuilder::new(); + let metasrv = builder.build().await.unwrap(); + let ctx = &mut metasrv.new_ctx(); + + let req = HeartbeatRequest { + duration_since_epoch: 1234, + ..Default::default() + }; + + let acc = &mut HeartbeatAccumulator::default(); + acc.stat = Some(Stat { + id: peer.id, + region_stats: vec![new_empty_region_stat(region_id, RegionRole::StagingLeader)], + ..Default::default() + }); + + let handler = RegionLeaseHandler::new( + default_distributed_time_constants().region_lease.as_secs(), + table_metadata_manager.clone(), + Default::default(), + None, + ); + + handler.handle(&req, ctx, acc).await.unwrap(); + + assert_region_lease( + acc, + vec![GrantedRegion::new(region_id, RegionRole::StagingLeader)], + ); + } + fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec) { let region_lease = acc.region_lease.as_ref().unwrap().clone(); let granted: Vec = region_lease diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index 6d282fb49f..ac9f7d71b9 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -63,7 +63,9 @@ fn renew_region_lease_via_region_route( if let Some(leader) = ®ion_route.leader_peer && leader.id == datanode_id { - let region_role = if region_route.is_leader_downgrading() { + let region_role = if region_route.is_leader_staging() { + RegionRole::StagingLeader + } else if region_route.is_leader_downgrading() { RegionRole::DowngradingLeader } else { RegionRole::Leader @@ -313,6 +315,12 @@ mod tests { renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id), Some((region_id, RegionRole::DowngradingLeader)) ); + + region_route.leader_state = Some(LeaderState::Staging); + assert_eq!( + renew_region_lease_via_region_route(®ion_route, leader_peer_id, region_id), + Some((region_id, RegionRole::StagingLeader)) + ); } #[tokio::test] @@ -581,4 +589,118 @@ mod tests { ); } } + + #[tokio::test] + async fn test_renew_region_leases_reported_staging_expected_leader() { + let table_id = 1024; + let table_info: TableInfo = new_test_table_info(table_id); + + let region_id = RegionId::new(table_id, 1); + let leader_peer_id = 1024; + let region_route = RegionRouteBuilder::default() + .region(Region::new_test(region_id)) + .leader_peer(Peer::empty(leader_peer_id)) + .build() + .unwrap(); + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::physical(vec![region_route]), + HashMap::default(), + ) + .await + .unwrap(); + + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(leader_peer_id, &[(region_id, RegionRole::StagingLeader)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!( + renewed, + HashMap::from([( + region_id, + RegionLeaseInfo::from((region_id, RegionRole::Leader)) + )]) + ); + } + + #[tokio::test] + async fn test_renew_region_leases_reported_staging_expected_staging() { + let table_id = 1024; + let table_info: TableInfo = new_test_table_info(table_id); + + let region_id = RegionId::new(table_id, 1); + let leader_peer_id = 1024; + let region_route = RegionRouteBuilder::default() + .region(Region::new_test(region_id)) + .leader_peer(Peer::empty(leader_peer_id)) + .leader_state(LeaderState::Staging) + .build() + .unwrap(); + + let keeper = new_test_keeper(); + let table_metadata_manager = keeper.table_metadata_manager(); + table_metadata_manager + .create_table_metadata( + table_info, + TableRouteValue::physical(vec![region_route]), + HashMap::default(), + ) + .await + .unwrap(); + + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(leader_peer_id, &[(region_id, RegionRole::StagingLeader)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!( + renewed, + HashMap::from([( + region_id, + RegionLeaseInfo::from((region_id, RegionRole::StagingLeader)) + )]) + ); + } + + #[tokio::test] + async fn test_renew_region_leases_operating_region_preserves_reported_role() { + let keeper = new_test_keeper(); + let datanode_id = 1024; + let region_id = RegionId::new(2048, 1); + + let _guard = keeper + .memory_region_keeper + .register(datanode_id, region_id) + .unwrap(); + + let RenewRegionLeasesResponse { + non_exists, + renewed, + } = keeper + .renew_region_leases(datanode_id, &[(region_id, RegionRole::StagingLeader)]) + .await + .unwrap(); + + assert!(non_exists.is_empty()); + assert_eq!( + renewed, + HashMap::from([( + region_id, + RegionLeaseInfo::operating(region_id, RegionRole::StagingLeader) + )]) + ); + } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index d006067f0d..5bd1002581 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -1114,13 +1114,9 @@ impl EngineInner { } fn role(&self, region_id: RegionId) -> Option { - self.workers.get_region(region_id).map(|region| { - if region.is_follower() { - RegionRole::Follower - } else { - RegionRole::Leader - } - }) + self.workers + .get_region(region_id) + .map(|region| region.region_role()) } } diff --git a/src/mito2/src/engine/apply_staging_manifest_test.rs b/src/mito2/src/engine/apply_staging_manifest_test.rs index a82fcfe049..efa0713cfc 100644 --- a/src/mito2/src/engine/apply_staging_manifest_test.rs +++ b/src/mito2/src/engine/apply_staging_manifest_test.rs @@ -333,7 +333,7 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) { let staging_manifest = region.manifest_ctx.staging_manifest().await; assert!(staging_manifest.is_none()); // The staging partition expr should be cleared. - assert!(region.staging_partition_info.lock().unwrap().is_none()); + assert!(region.manifest_ctx.staging_partition_info().is_none()); // The staging manifest directory should be empty. let data_home = env.data_home(); let region_dir = format!("{}/data/test/1_0000000001", data_home.display()); diff --git a/src/mito2/src/engine/set_role_state_test.rs b/src/mito2/src/engine/set_role_state_test.rs index 4fb15ab7fe..40e03b063a 100644 --- a/src/mito2/src/engine/set_role_state_test.rs +++ b/src/mito2/src/engine/set_role_state_test.rs @@ -19,7 +19,9 @@ use store_api::region_engine::{ RegionEngine, RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState, }; -use store_api::region_request::{RegionPutRequest, RegionRequest}; +use store_api::region_request::{ + EnterStagingRequest, RegionPutRequest, RegionRequest, StagingPartitionDirective, +}; use store_api::storage::RegionId; use crate::config::MitoConfig; @@ -241,12 +243,14 @@ async fn test_unified_state_transitions_with_format(flat_format: bool) { .await .unwrap(); assert_success_response(&result, 0); + assert_eq!(engine.role(region_id), Some(RegionRole::StagingLeader)); let result = engine .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader) .await .unwrap(); assert_success_response(&result, 0); + assert_eq!(engine.role(region_id), Some(RegionRole::Leader)); // Leader -> StagingLeader -> Follower (exit staging via demotion) engine @@ -259,6 +263,7 @@ async fn test_unified_state_transitions_with_format(flat_format: bool) { .await .unwrap(); assert_success_response(&result, 0); + assert_eq!(engine.role(region_id), Some(RegionRole::Follower)); // Note: Direct Follower -> Leader promotion is no longer allowed // Use existing set_region_role method for follower -> leader promotion @@ -277,6 +282,7 @@ async fn test_unified_state_transitions_with_format(flat_format: bool) { .await .unwrap(); assert_success_response(&result, 0); + assert_eq!(engine.role(region_id), Some(RegionRole::DowngradingLeader)); // Note: Direct DowngradingLeader -> Leader is no longer allowed // Use existing set_region_role method for downgrading -> leader promotion @@ -325,6 +331,264 @@ async fn test_restricted_state_transitions() { test_restricted_state_transitions_with_format(true).await; } +#[tokio::test] +async fn test_direct_set_region_role_staging_leader_is_noop() { + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + engine + .set_region_role(region_id, RegionRole::StagingLeader) + .unwrap(); + + assert_eq!(engine.role(region_id), Some(RegionRole::Leader)); + + engine + .set_region_role(region_id, RegionRole::Follower) + .unwrap(); + engine + .set_region_role(region_id, RegionRole::StagingLeader) + .unwrap(); + + assert_eq!(engine.role(region_id), Some(RegionRole::Follower)); +} + +#[tokio::test] +async fn test_direct_set_region_role_exits_staging_state_only() { + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + engine + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_directive: StagingPartitionDirective::RejectAllWrites, + }), + ) + .await + .unwrap(); + assert_eq!(engine.role(region_id), Some(RegionRole::StagingLeader)); + assert!( + engine + .get_region(region_id) + .unwrap() + .manifest_ctx + .staging_partition_info() + .is_some() + ); + + engine + .set_region_role(region_id, RegionRole::Leader) + .unwrap(); + assert_eq!(engine.role(region_id), Some(RegionRole::Leader)); + assert!( + engine + .get_region(region_id) + .unwrap() + .manifest_ctx + .staging_partition_info() + .is_none() + ); +} + +#[tokio::test] +async fn test_set_region_role_can_exit_staging_to_leader() { + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + engine + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader) + .await + .unwrap(); + assert_eq!(engine.role(region_id), Some(RegionRole::StagingLeader)); + + engine + .set_region_role(region_id, RegionRole::Leader) + .unwrap(); + + assert_eq!(engine.role(region_id), Some(RegionRole::Leader)); + assert!( + engine + .get_region(region_id) + .unwrap() + .manifest_ctx + .staging_partition_info() + .is_none() + ); +} + +#[tokio::test] +async fn test_set_region_role_leader_clears_staging_partition_info() { + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + engine + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_directive: StagingPartitionDirective::RejectAllWrites, + }), + ) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + assert!(region.manifest_ctx.staging_partition_info().is_some()); + + engine + .set_region_role(region_id, RegionRole::Leader) + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + assert_eq!(engine.role(region_id), Some(RegionRole::Leader)); + assert!(region.manifest_ctx.staging_partition_info().is_none()); +} + +#[tokio::test] +async fn test_set_region_role_follower_clears_staging_partition_info() { + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + engine + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_directive: StagingPartitionDirective::RejectAllWrites, + }), + ) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + assert!(region.manifest_ctx.staging_partition_info().is_some()); + + engine + .set_region_role(region_id, RegionRole::Follower) + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + assert_eq!(engine.role(region_id), Some(RegionRole::Follower)); + assert!(region.manifest_ctx.staging_partition_info().is_none()); +} + +#[tokio::test] +async fn test_set_region_role_downgrading_leader_clears_staging_partition_info() { + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + engine + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_directive: StagingPartitionDirective::RejectAllWrites, + }), + ) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + assert!(region.manifest_ctx.staging_partition_info().is_some()); + + engine + .set_region_role(region_id, RegionRole::DowngradingLeader) + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + assert_eq!(engine.role(region_id), Some(RegionRole::DowngradingLeader)); + assert!(region.manifest_ctx.staging_partition_info().is_none()); +} + +#[tokio::test] +async fn test_can_reenter_staging_after_direct_exit_cleanup() { + let mut env = TestEnv::new().await; + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + engine + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_directive: StagingPartitionDirective::RejectAllWrites, + }), + ) + .await + .unwrap(); + engine + .set_region_role(region_id, RegionRole::Follower) + .unwrap(); + engine + .set_region_role(region_id, RegionRole::Leader) + .unwrap(); + + engine + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_directive: StagingPartitionDirective::RejectAllWrites, + }), + ) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + assert_eq!(engine.role(region_id), Some(RegionRole::StagingLeader)); + assert!(region.manifest_ctx.staging_partition_info().is_some()); +} + async fn test_restricted_state_transitions_with_format(flat_format: bool) { let mut env = TestEnv::new().await; let engine = env diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index bd90779e0b..9846933d1f 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -547,7 +547,7 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) { .await .unwrap(); let region = engine.get_region(region_id).unwrap(); - let staging_partition_info = region.staging_partition_info.lock().unwrap().clone(); + let staging_partition_info = region.manifest_ctx.staging_partition_info(); assert_eq!( staging_partition_info .unwrap() diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 26ab96c779..3804b28afb 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -156,11 +156,6 @@ pub struct MitoRegion { pub(crate) topic_latest_entry_id: AtomicU64, /// The total bytes written to the region. pub(crate) written_bytes: Arc, - /// Partition info of the region in staging mode. - /// - /// During the staging mode, the region metadata in [`VersionControlRef`] is not updated, - /// so we need to store the partition info separately. - pub(crate) staging_partition_info: Mutex>, /// manifest stats stats: ManifestStats, } @@ -333,6 +328,17 @@ impl MitoRegion { self.manifest_ctx.set_role(next_role, self.region_id); } + pub(crate) fn region_role(&self) -> RegionRole { + match self.state() { + RegionRoleState::Follower => RegionRole::Follower, + RegionRoleState::Leader(RegionLeaderState::Staging) => RegionRole::StagingLeader, + RegionRoleState::Leader(RegionLeaderState::Downgrading) => { + RegionRole::DowngradingLeader + } + RegionRoleState::Leader(_) => RegionRole::Leader, + } + } + /// Sets the altering state. /// You should call this method in the worker loop. pub(crate) fn set_altering(&self) -> Result<()> { @@ -393,9 +399,8 @@ impl MitoRegion { /// You should call this method in the worker loop. /// Transitions from Staging to Writable state. pub fn exit_staging(&self) -> Result<()> { - *self.staging_partition_info.lock().unwrap() = None; - self.compare_exchange_state( - RegionLeaderState::Staging, + self.manifest_ctx.exit_staging( + self.region_id, RegionRoleState::Leader(RegionLeaderState::Writable), ) } @@ -819,7 +824,7 @@ impl MitoRegion { pub fn maybe_staging_partition_expr_str(&self) -> Option { let is_staging = self.is_staging(); if is_staging { - let staging_partition_info = self.staging_partition_info.lock().unwrap(); + let staging_partition_info = self.manifest_ctx.staging_partition_info(); if staging_partition_info.is_none() { warn!( "Staging partition expr is none for region {} in staging state", @@ -837,8 +842,8 @@ impl MitoRegion { pub fn expected_partition_expr_version(&self) -> u64 { if self.is_staging() { - let staging_partition_info = self.staging_partition_info.lock().unwrap(); - staging_partition_info + self.manifest_ctx + .staging_partition_info() .as_ref() .map(|info| info.partition_rule_version) .unwrap_or_default() @@ -852,8 +857,8 @@ impl MitoRegion { if !self.is_staging() { return false; } - let staging_partition_info = self.staging_partition_info.lock().unwrap(); - staging_partition_info + self.manifest_ctx + .staging_partition_info() .as_ref() .map(|info| { matches!( @@ -873,6 +878,11 @@ pub(crate) struct ManifestContext { /// The state of the region. The region checks the state before updating /// manifest. state: AtomicCell, + /// Partition info of the region in staging mode. + /// + /// During the staging mode, the region metadata in [`VersionControlRef`] is not updated, + /// so we need to store the partition info separately. + staging_partition_info: Mutex>, } impl ManifestContext { @@ -880,9 +890,46 @@ impl ManifestContext { ManifestContext { manifest_manager: tokio::sync::RwLock::new(manager), state: AtomicCell::new(state), + staging_partition_info: Mutex::new(None), } } + pub(crate) fn staging_partition_info(&self) -> Option { + self.staging_partition_info.lock().unwrap().clone() + } + + pub(crate) fn set_staging_partition_info(&self, staging_partition_info: StagingPartitionInfo) { + let mut current = self.staging_partition_info.lock().unwrap(); + debug_assert!(current.is_none()); + *current = Some(staging_partition_info); + } + + fn clear_staging_partition_info(&self) { + *self.staging_partition_info.lock().unwrap() = None; + } + + pub(crate) fn exit_staging( + &self, + region_id: RegionId, + next_state: RegionRoleState, + ) -> Result<()> { + self.state + .compare_exchange( + RegionRoleState::Leader(RegionLeaderState::Staging), + next_state, + ) + .map_err(|actual| { + RegionStateSnafu { + region_id, + state: actual, + expect: RegionRoleState::Leader(RegionLeaderState::Staging), + } + .build() + })?; + self.clear_staging_partition_info(); + Ok(()) + } + pub(crate) async fn manifest_version(&self) -> ManifestVersion { self.manifest_manager .read() @@ -1028,27 +1075,50 @@ impl ManifestContext { /// Sets the [`RegionRole`]. /// /// ```text - /// +------------------------------------------+ - /// | +-----------------+ | - /// | | | | - /// +---+------+ +-------+-----+ +--v-v---+ - /// | Follower | | Downgrading | | Leader | - /// +---^-^----+ +-----+-^-----+ +--+-+---+ - /// | | | | | | - /// | +------------------+ +-----------------+ | - /// +------------------------------------------+ - /// - /// Transition: - /// - Follower -> Leader - /// - Downgrading Leader -> Leader - /// - Leader -> Follower - /// - Downgrading Leader -> Follower - /// - Leader -> Downgrading Leader + /// +---------------------+ + /// | Staging Leader | + /// +----------+----------+ + /// | + /// v + /// +----------+ +------+-------+ +-------------+ + /// | Follower | <-> | Leader | <-> | Downgrading | + /// +-----+----+ +------+-------+ +------+------+ + /// ^ ^ | + /// +-----------------+--------------------+ /// /// ``` + /// + /// # State Transitions + /// + /// From `Follower`: + /// - `Follower -> Leader` + /// + /// From `Leader`: + /// - `Leader -> Follower` + /// - `Leader -> Downgrading Leader` + /// + /// From `Staging Leader`: + /// - `Staging Leader -> Leader` + /// - `Staging Leader -> Follower` + /// - `Staging Leader -> Downgrading Leader` + /// + /// From `Downgrading Leader`: + /// - `Downgrading Leader -> Leader` + /// - `Downgrading Leader -> Follower` pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) { match next_role { RegionRole::Follower => { + if self + .exit_staging(region_id, RegionRoleState::Follower) + .is_ok() + { + info!( + "Convert region {} to follower, previous role state: {:?}", + region_id, + RegionRoleState::Leader(RegionLeaderState::Staging) + ); + return; + } match self.state.fetch_update(|state| { if !matches!(state, RegionRoleState::Follower) { Some(RegionRoleState::Follower) @@ -1071,6 +1141,20 @@ impl ManifestContext { } } RegionRole::Leader => { + if self + .exit_staging( + region_id, + RegionRoleState::Leader(RegionLeaderState::Writable), + ) + .is_ok() + { + info!( + "Convert region {} to leader, previous role state: {:?}", + region_id, + RegionRoleState::Leader(RegionLeaderState::Staging) + ); + return; + } match self.state.fetch_update(|state| { if matches!( state, @@ -1096,7 +1180,27 @@ impl ManifestContext { } } } + RegionRole::StagingLeader => { + info!( + "Ignore direct conversion of region {} to staging leader; staging requires the dedicated workflow", + region_id + ); + } RegionRole::DowngradingLeader => { + if self + .exit_staging( + region_id, + RegionRoleState::Leader(RegionLeaderState::Downgrading), + ) + .is_ok() + { + info!( + "Convert region {} to downgrading region, previous role state: {:?}", + region_id, + RegionRoleState::Leader(RegionLeaderState::Staging) + ); + return; + } match self.state.compare_exchange( RegionRoleState::Leader(RegionLeaderState::Writable), RegionRoleState::Leader(RegionLeaderState::Downgrading), @@ -1438,8 +1542,8 @@ pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result StagingLeader should be ignored. + manifest_ctx.set_role(RegionRole::StagingLeader, region_id); + assert_eq!( + manifest_ctx.state.load(), + RegionRoleState::Leader(RegionLeaderState::Writable) + ); + // Leader -> Downgrading Leader manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id); assert_eq!( @@ -1825,7 +1935,6 @@ mod tests { topic_latest_entry_id: Default::default(), written_bytes: Arc::new(AtomicU64::new(0)), stats: ManifestStats::default(), - staging_partition_info: Mutex::new(None), }; // Test initial state diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index b23e73557d..c1240c3829 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -17,7 +17,7 @@ use std::any::TypeId; use std::collections::HashMap; use std::sync::atomic::{AtomicI64, AtomicU64}; -use std::sync::{Arc, LazyLock, Mutex}; +use std::sync::{Arc, LazyLock}; use std::time::Instant; use common_telemetry::{debug, error, info, warn}; @@ -349,7 +349,6 @@ impl RegionOpener { topic_latest_entry_id: AtomicU64::new(0), written_bytes: Arc::new(AtomicU64::new(0)), stats: self.stats, - staging_partition_info: Mutex::new(None), })) } @@ -586,8 +585,6 @@ impl RegionOpener { topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id), written_bytes: Arc::new(AtomicU64::new(0)), stats: self.stats.clone(), - // TODO(weny): reload the staging partition info from the manifest. - staging_partition_info: Mutex::new(None), }; let region = Arc::new(region); diff --git a/src/mito2/src/worker/handle_apply_staging.rs b/src/mito2/src/worker/handle_apply_staging.rs index e773150356..876d5c3c31 100644 --- a/src/mito2/src/worker/handle_apply_staging.rs +++ b/src/mito2/src/worker/handle_apply_staging.rs @@ -75,7 +75,7 @@ impl RegionWorkerLoop { return; } - let staging_partition_info = region.staging_partition_info.lock().unwrap().clone(); + let staging_partition_info = region.manifest_ctx.staging_partition_info(); let staging_partition_expr = staging_partition_info .as_ref() diff --git a/src/mito2/src/worker/handle_enter_staging.rs b/src/mito2/src/worker/handle_enter_staging.rs index 8b75fdd24f..83bd51df15 100644 --- a/src/mito2/src/worker/handle_enter_staging.rs +++ b/src/mito2/src/worker/handle_enter_staging.rs @@ -42,7 +42,7 @@ impl RegionWorkerLoop { // If the region is already in staging mode, verify the partition directive matches. if region.is_staging() { - let staging_partition_info = region.staging_partition_info.lock().unwrap().clone(); + let staging_partition_info = region.manifest_ctx.staging_partition_info(); // If the partition directive mismatches, return error. if staging_partition_info .as_ref() @@ -279,10 +279,8 @@ impl RegionWorkerLoop { region: &MitoRegionRef, partition_directive: StagingPartitionDirective, ) { - let mut staging_partition_info = region.staging_partition_info.lock().unwrap(); - debug_assert!(staging_partition_info.is_none()); - *staging_partition_info = Some(StagingPartitionInfo::from_partition_directive( - partition_directive, - )); + region.manifest_ctx.set_staging_partition_info( + StagingPartitionInfo::from_partition_directive(partition_directive), + ); } } diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 287f64d225..b235fcffc7 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -67,7 +67,7 @@ impl From for RegionRole { SettableRegionRoleState::Follower => RegionRole::Follower, SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader, SettableRegionRoleState::Leader => RegionRole::Leader, - SettableRegionRoleState::StagingLeader => RegionRole::Leader, // Still a leader role + SettableRegionRoleState::StagingLeader => RegionRole::StagingLeader, } } } @@ -210,6 +210,11 @@ pub enum RegionRole { Follower, // Writable region(mito2), Readonly region(file). Leader, + // Leader is in staging mode. + // + // This is leader-like and writable, but it follows the staging workflow + // semantics instead of a normal leader's steady state. + StagingLeader, // Leader is downgrading to follower. // // This state is used to prevent new write requests. @@ -221,6 +226,7 @@ impl Display for RegionRole { match self { RegionRole::Follower => write!(f, "Follower"), RegionRole::Leader => write!(f, "Leader"), + RegionRole::StagingLeader => write!(f, "Leader(Staging)"), RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"), } } @@ -228,7 +234,7 @@ impl Display for RegionRole { impl RegionRole { pub fn writable(&self) -> bool { - matches!(self, RegionRole::Leader) + matches!(self, RegionRole::Leader | RegionRole::StagingLeader) } } @@ -237,6 +243,7 @@ impl From for PbRegionRole { match value { RegionRole::Follower => PbRegionRole::Follower, RegionRole::Leader => PbRegionRole::Leader, + RegionRole::StagingLeader => PbRegionRole::StagingLeader, RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader, } } @@ -246,6 +253,7 @@ impl From for RegionRole { fn from(value: PbRegionRole) -> Self { match value { PbRegionRole::Leader => RegionRole::Leader, + PbRegionRole::StagingLeader => RegionRole::StagingLeader, PbRegionRole::Follower => RegionRole::Follower, PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader, }