feat: propagate staging leader through lease and heartbeat (#7950)

* feat(mito): expose staging leader role state

* fix(region): clear staging metadata on leader exit

* feat: propagate staging leader role through heartbeat and metasrv

* chore: update comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix(region): unify staging exit role transitions

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-04-13 17:04:02 +08:00
committed by GitHub
parent 01a73105b8
commit 57f1921253
20 changed files with 782 additions and 65 deletions

3
Cargo.lock generated
View File

@@ -5681,7 +5681,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=092ba1d01e2da676dca66cca7eebb55009da8ef8#092ba1d01e2da676dca66cca7eebb55009da8ef8"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=26a50f4069f50c37d65b45e0d39ae0cb42de5425#26a50f4069f50c37d65b45e0d39ae0cb42de5425"
dependencies = [
"prost 0.14.1",
"prost-types 0.14.1",
@@ -5691,7 +5691,6 @@ dependencies = [
"strum_macros 0.25.3",
"tonic 0.14.2",
"tonic-prost",
"tonic-prost-build",
]
[[package]]

View File

@@ -154,7 +154,7 @@ etcd-client = { version = "0.17", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "092ba1d01e2da676dca66cca7eebb55009da8ef8" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "26a50f4069f50c37d65b45e0d39ae0cb42de5425" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -573,4 +573,27 @@ mod tests {
let region_num = stat_val.region_num().unwrap();
assert_eq!(2, region_num);
}
#[test]
fn test_region_stat_from_heartbeat_preserves_staging_leader_role() {
let request = HeartbeatRequest {
header: Some(RequestHeader::default()),
peer: Some(api::v1::meta::Peer {
id: 1,
addr: "127.0.0.1:3001".to_string(),
}),
region_stats: vec![api::v1::meta::RegionStat {
region_id: RegionId::new(1024, 1).as_u64(),
engine: "mito".to_string(),
role: api::v1::meta::RegionRole::StagingLeader.into(),
..Default::default()
}],
..Default::default()
};
let stat = Stat::try_from(&request).unwrap();
assert_eq!(stat.region_stats.len(), 1);
assert_eq!(stat.region_stats[0].role, RegionRole::StagingLeader);
}
}

View File

@@ -503,6 +503,7 @@ mod test {
use mito2::config::MitoConfig;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective};
use super::*;
use crate::tests::mock_region_server;
@@ -621,4 +622,141 @@ mod test {
> Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4)
);
}
#[tokio::test(flavor = "multi_thread")]
async fn renew_staging_leader_keeps_region_in_staging() {
let mut region_server = mock_region_server();
let mut engine_env = TestEnv::with_prefix("region-alive-keeper-staging").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());
let alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
None,
Duration::from_millis(100),
));
let region_id = RegionId::new(1024, 2);
region_server
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
region_server
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_directive: StagingPartitionDirective::RejectAllWrites,
}),
)
.await
.unwrap();
alive_keeper.register_region(region_id).await;
alive_keeper
.renew_region_leases(
&[GrantedRegion {
region_id: region_id.as_u64(),
role: api::v1::meta::RegionRole::StagingLeader.into(),
extensions: HashMap::new(),
}],
Instant::now() + Duration::from_millis(3000),
)
.await;
assert_eq!(engine.role(region_id).unwrap(), RegionRole::StagingLeader);
}
#[tokio::test(flavor = "multi_thread")]
async fn renew_staging_leader_exit_into_leader() {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let mut engine_env = TestEnv::with_prefix("region-alive-keeper-staging-exit").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());
let alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
None,
Duration::from_millis(100),
));
let region_id = RegionId::new(1024, 2);
region_server
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
region_server
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_directive: StagingPartitionDirective::RejectAllWrites,
}),
)
.await
.unwrap();
alive_keeper.register_region(region_id).await;
alive_keeper
.renew_region_leases(
&[GrantedRegion {
region_id: region_id.as_u64(),
role: api::v1::meta::RegionRole::Leader.into(),
extensions: HashMap::new(),
}],
Instant::now() + Duration::from_millis(3000),
)
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
}
#[tokio::test(flavor = "multi_thread")]
async fn renew_staging_leader_does_not_promote_normal_leader_into_staging() {
let mut region_server = mock_region_server();
let mut engine_env = TestEnv::with_prefix("region-alive-keeper-non-staging").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());
let alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
None,
Duration::from_millis(100),
));
let region_id = RegionId::new(1024, 4);
region_server
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
region_server
.set_region_role(region_id, RegionRole::Leader)
.unwrap();
alive_keeper.register_region(region_id).await;
alive_keeper
.renew_region_leases(
&[GrantedRegion {
region_id: region_id.as_u64(),
role: api::v1::meta::RegionRole::StagingLeader.into(),
extensions: HashMap::new(),
}],
Instant::now() + Duration::from_millis(3000),
)
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(engine.role(region_id).unwrap(), RegionRole::Leader);
}
}

View File

@@ -148,9 +148,9 @@ impl HeartbeatTask {
let mut follower_region_lease_count = 0;
for lease in &lease.regions {
match lease.role() {
RegionRole::Leader | RegionRole::DowngradingLeader => {
leader_region_lease_count += 1
}
RegionRole::Leader
| RegionRole::StagingLeader
| RegionRole::DowngradingLeader => leader_region_lease_count += 1,
RegionRole::Follower => follower_region_lease_count += 1,
}
}

View File

@@ -360,6 +360,7 @@ impl RegionServer {
engine.role(region_id).map(|role| match role {
RegionRole::Follower => false,
RegionRole::Leader => true,
RegionRole::StagingLeader => true,
RegionRole::DowngradingLeader => true,
})
})

View File

@@ -129,7 +129,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
let leader_regions = stat
.region_stats
.iter()
.filter(|s| s.role == RegionRole::Leader)
.filter(|s| matches!(s.role, RegionRole::Leader | RegionRole::StagingLeader))
.count();
let follower_regions = stat.region_stats.len() - leader_regions;

View File

@@ -40,7 +40,7 @@ impl HeartbeatHandler for CollectLeaderRegionHandler {
let mut key_values = Vec::with_capacity(current_stat.region_stats.len());
for stat in current_stat.region_stats.iter() {
if stat.role != RegionRole::Leader {
if !matches!(stat.role, RegionRole::Leader | RegionRole::StagingLeader) {
continue;
}

View File

@@ -121,7 +121,10 @@ fn to_persisted_if_leader(
datanode_id: DatanodeId,
timestamp_millis: i64,
) -> Option<(Row, PersistedRegionStat)> {
if matches!(region_stat.role, RegionRole::Leader) {
if matches!(
region_stat.role,
RegionRole::Leader | RegionRole::StagingLeader
) {
let persisted_region_stat = last_persisted_region_stats.get(&region_stat.id).map(|s| *s);
Some((
compute_persist_region_stat(

View File

@@ -398,6 +398,65 @@ mod test {
assert_eq!(acc.inactive_region_ids, HashSet::from([no_exist_region_id]));
}
#[tokio::test]
async fn test_handle_staging_leader() {
let datanode_id = 1;
let region_number = 1u32;
let table_id = 10;
let region_id = RegionId::new(table_id, region_number);
let peer = Peer::empty(datanode_id);
let table_info = new_test_table_info(table_id);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(peer.clone()),
leader_state: Some(LeaderState::Staging),
..Default::default()
}];
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
HashMap::default(),
)
.await
.unwrap();
let builder = MetasrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let ctx = &mut metasrv.new_ctx();
let req = HeartbeatRequest {
duration_since_epoch: 1234,
..Default::default()
};
let acc = &mut HeartbeatAccumulator::default();
acc.stat = Some(Stat {
id: peer.id,
region_stats: vec![new_empty_region_stat(region_id, RegionRole::StagingLeader)],
..Default::default()
});
let handler = RegionLeaseHandler::new(
default_distributed_time_constants().region_lease.as_secs(),
table_metadata_manager.clone(),
Default::default(),
None,
);
handler.handle(&req, ctx, acc).await.unwrap();
assert_region_lease(
acc,
vec![GrantedRegion::new(region_id, RegionRole::StagingLeader)],
);
}
fn assert_region_lease(acc: &HeartbeatAccumulator, expected: Vec<GrantedRegion>) {
let region_lease = acc.region_lease.as_ref().unwrap().clone();
let granted: Vec<GrantedRegion> = region_lease

View File

@@ -63,7 +63,9 @@ fn renew_region_lease_via_region_route(
if let Some(leader) = &region_route.leader_peer
&& leader.id == datanode_id
{
let region_role = if region_route.is_leader_downgrading() {
let region_role = if region_route.is_leader_staging() {
RegionRole::StagingLeader
} else if region_route.is_leader_downgrading() {
RegionRole::DowngradingLeader
} else {
RegionRole::Leader
@@ -313,6 +315,12 @@ mod tests {
renew_region_lease_via_region_route(&region_route, leader_peer_id, region_id),
Some((region_id, RegionRole::DowngradingLeader))
);
region_route.leader_state = Some(LeaderState::Staging);
assert_eq!(
renew_region_lease_via_region_route(&region_route, leader_peer_id, region_id),
Some((region_id, RegionRole::StagingLeader))
);
}
#[tokio::test]
@@ -581,4 +589,118 @@ mod tests {
);
}
}
#[tokio::test]
async fn test_renew_region_leases_reported_staging_expected_leader() {
let table_id = 1024;
let table_info: TableInfo = new_test_table_info(table_id);
let region_id = RegionId::new(table_id, 1);
let leader_peer_id = 1024;
let region_route = RegionRouteBuilder::default()
.region(Region::new_test(region_id))
.leader_peer(Peer::empty(leader_peer_id))
.build()
.unwrap();
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(vec![region_route]),
HashMap::default(),
)
.await
.unwrap();
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = keeper
.renew_region_leases(leader_peer_id, &[(region_id, RegionRole::StagingLeader)])
.await
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(
renewed,
HashMap::from([(
region_id,
RegionLeaseInfo::from((region_id, RegionRole::Leader))
)])
);
}
#[tokio::test]
async fn test_renew_region_leases_reported_staging_expected_staging() {
let table_id = 1024;
let table_info: TableInfo = new_test_table_info(table_id);
let region_id = RegionId::new(table_id, 1);
let leader_peer_id = 1024;
let region_route = RegionRouteBuilder::default()
.region(Region::new_test(region_id))
.leader_peer(Peer::empty(leader_peer_id))
.leader_state(LeaderState::Staging)
.build()
.unwrap();
let keeper = new_test_keeper();
let table_metadata_manager = keeper.table_metadata_manager();
table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(vec![region_route]),
HashMap::default(),
)
.await
.unwrap();
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = keeper
.renew_region_leases(leader_peer_id, &[(region_id, RegionRole::StagingLeader)])
.await
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(
renewed,
HashMap::from([(
region_id,
RegionLeaseInfo::from((region_id, RegionRole::StagingLeader))
)])
);
}
#[tokio::test]
async fn test_renew_region_leases_operating_region_preserves_reported_role() {
let keeper = new_test_keeper();
let datanode_id = 1024;
let region_id = RegionId::new(2048, 1);
let _guard = keeper
.memory_region_keeper
.register(datanode_id, region_id)
.unwrap();
let RenewRegionLeasesResponse {
non_exists,
renewed,
} = keeper
.renew_region_leases(datanode_id, &[(region_id, RegionRole::StagingLeader)])
.await
.unwrap();
assert!(non_exists.is_empty());
assert_eq!(
renewed,
HashMap::from([(
region_id,
RegionLeaseInfo::operating(region_id, RegionRole::StagingLeader)
)])
);
}
}

View File

@@ -1114,13 +1114,9 @@ impl EngineInner {
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
self.workers.get_region(region_id).map(|region| {
if region.is_follower() {
RegionRole::Follower
} else {
RegionRole::Leader
}
})
self.workers
.get_region(region_id)
.map(|region| region.region_role())
}
}

View File

@@ -333,7 +333,7 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) {
let staging_manifest = region.manifest_ctx.staging_manifest().await;
assert!(staging_manifest.is_none());
// The staging partition expr should be cleared.
assert!(region.staging_partition_info.lock().unwrap().is_none());
assert!(region.manifest_ctx.staging_partition_info().is_none());
// The staging manifest directory should be empty.
let data_home = env.data_home();
let region_dir = format!("{}/data/test/1_0000000001", data_home.display());

View File

@@ -19,7 +19,9 @@ use store_api::region_engine::{
RegionEngine, RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
SettableRegionRoleState,
};
use store_api::region_request::{RegionPutRequest, RegionRequest};
use store_api::region_request::{
EnterStagingRequest, RegionPutRequest, RegionRequest, StagingPartitionDirective,
};
use store_api::storage::RegionId;
use crate::config::MitoConfig;
@@ -241,12 +243,14 @@ async fn test_unified_state_transitions_with_format(flat_format: bool) {
.await
.unwrap();
assert_success_response(&result, 0);
assert_eq!(engine.role(region_id), Some(RegionRole::StagingLeader));
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
.await
.unwrap();
assert_success_response(&result, 0);
assert_eq!(engine.role(region_id), Some(RegionRole::Leader));
// Leader -> StagingLeader -> Follower (exit staging via demotion)
engine
@@ -259,6 +263,7 @@ async fn test_unified_state_transitions_with_format(flat_format: bool) {
.await
.unwrap();
assert_success_response(&result, 0);
assert_eq!(engine.role(region_id), Some(RegionRole::Follower));
// Note: Direct Follower -> Leader promotion is no longer allowed
// Use existing set_region_role method for follower -> leader promotion
@@ -277,6 +282,7 @@ async fn test_unified_state_transitions_with_format(flat_format: bool) {
.await
.unwrap();
assert_success_response(&result, 0);
assert_eq!(engine.role(region_id), Some(RegionRole::DowngradingLeader));
// Note: Direct DowngradingLeader -> Leader is no longer allowed
// Use existing set_region_role method for downgrading -> leader promotion
@@ -325,6 +331,264 @@ async fn test_restricted_state_transitions() {
test_restricted_state_transitions_with_format(true).await;
}
#[tokio::test]
async fn test_direct_set_region_role_staging_leader_is_noop() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.set_region_role(region_id, RegionRole::StagingLeader)
.unwrap();
assert_eq!(engine.role(region_id), Some(RegionRole::Leader));
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
engine
.set_region_role(region_id, RegionRole::StagingLeader)
.unwrap();
assert_eq!(engine.role(region_id), Some(RegionRole::Follower));
}
#[tokio::test]
async fn test_direct_set_region_role_exits_staging_state_only() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_directive: StagingPartitionDirective::RejectAllWrites,
}),
)
.await
.unwrap();
assert_eq!(engine.role(region_id), Some(RegionRole::StagingLeader));
assert!(
engine
.get_region(region_id)
.unwrap()
.manifest_ctx
.staging_partition_info()
.is_some()
);
engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();
assert_eq!(engine.role(region_id), Some(RegionRole::Leader));
assert!(
engine
.get_region(region_id)
.unwrap()
.manifest_ctx
.staging_partition_info()
.is_none()
);
}
#[tokio::test]
async fn test_set_region_role_can_exit_staging_to_leader() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
assert_eq!(engine.role(region_id), Some(RegionRole::StagingLeader));
engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();
assert_eq!(engine.role(region_id), Some(RegionRole::Leader));
assert!(
engine
.get_region(region_id)
.unwrap()
.manifest_ctx
.staging_partition_info()
.is_none()
);
}
#[tokio::test]
async fn test_set_region_role_leader_clears_staging_partition_info() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_directive: StagingPartitionDirective::RejectAllWrites,
}),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
assert!(region.manifest_ctx.staging_partition_info().is_some());
engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();
let region = engine.get_region(region_id).unwrap();
assert_eq!(engine.role(region_id), Some(RegionRole::Leader));
assert!(region.manifest_ctx.staging_partition_info().is_none());
}
#[tokio::test]
async fn test_set_region_role_follower_clears_staging_partition_info() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_directive: StagingPartitionDirective::RejectAllWrites,
}),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
assert!(region.manifest_ctx.staging_partition_info().is_some());
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
let region = engine.get_region(region_id).unwrap();
assert_eq!(engine.role(region_id), Some(RegionRole::Follower));
assert!(region.manifest_ctx.staging_partition_info().is_none());
}
#[tokio::test]
async fn test_set_region_role_downgrading_leader_clears_staging_partition_info() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_directive: StagingPartitionDirective::RejectAllWrites,
}),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
assert!(region.manifest_ctx.staging_partition_info().is_some());
engine
.set_region_role(region_id, RegionRole::DowngradingLeader)
.unwrap();
let region = engine.get_region(region_id).unwrap();
assert_eq!(engine.role(region_id), Some(RegionRole::DowngradingLeader));
assert!(region.manifest_ctx.staging_partition_info().is_none());
}
#[tokio::test]
async fn test_can_reenter_staging_after_direct_exit_cleanup() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_directive: StagingPartitionDirective::RejectAllWrites,
}),
)
.await
.unwrap();
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_directive: StagingPartitionDirective::RejectAllWrites,
}),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
assert_eq!(engine.role(region_id), Some(RegionRole::StagingLeader));
assert!(region.manifest_ctx.staging_partition_info().is_some());
}
async fn test_restricted_state_transitions_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env

View File

@@ -547,7 +547,7 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let staging_partition_info = region.staging_partition_info.lock().unwrap().clone();
let staging_partition_info = region.manifest_ctx.staging_partition_info();
assert_eq!(
staging_partition_info
.unwrap()

View File

@@ -156,11 +156,6 @@ pub struct MitoRegion {
pub(crate) topic_latest_entry_id: AtomicU64,
/// The total bytes written to the region.
pub(crate) written_bytes: Arc<AtomicU64>,
/// Partition info of the region in staging mode.
///
/// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
/// so we need to store the partition info separately.
pub(crate) staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
/// manifest stats
stats: ManifestStats,
}
@@ -333,6 +328,17 @@ impl MitoRegion {
self.manifest_ctx.set_role(next_role, self.region_id);
}
pub(crate) fn region_role(&self) -> RegionRole {
match self.state() {
RegionRoleState::Follower => RegionRole::Follower,
RegionRoleState::Leader(RegionLeaderState::Staging) => RegionRole::StagingLeader,
RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
RegionRole::DowngradingLeader
}
RegionRoleState::Leader(_) => RegionRole::Leader,
}
}
/// Sets the altering state.
/// You should call this method in the worker loop.
pub(crate) fn set_altering(&self) -> Result<()> {
@@ -393,9 +399,8 @@ impl MitoRegion {
/// You should call this method in the worker loop.
/// Transitions from Staging to Writable state.
pub fn exit_staging(&self) -> Result<()> {
*self.staging_partition_info.lock().unwrap() = None;
self.compare_exchange_state(
RegionLeaderState::Staging,
self.manifest_ctx.exit_staging(
self.region_id,
RegionRoleState::Leader(RegionLeaderState::Writable),
)
}
@@ -819,7 +824,7 @@ impl MitoRegion {
pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
let is_staging = self.is_staging();
if is_staging {
let staging_partition_info = self.staging_partition_info.lock().unwrap();
let staging_partition_info = self.manifest_ctx.staging_partition_info();
if staging_partition_info.is_none() {
warn!(
"Staging partition expr is none for region {} in staging state",
@@ -837,8 +842,8 @@ impl MitoRegion {
pub fn expected_partition_expr_version(&self) -> u64 {
if self.is_staging() {
let staging_partition_info = self.staging_partition_info.lock().unwrap();
staging_partition_info
self.manifest_ctx
.staging_partition_info()
.as_ref()
.map(|info| info.partition_rule_version)
.unwrap_or_default()
@@ -852,8 +857,8 @@ impl MitoRegion {
if !self.is_staging() {
return false;
}
let staging_partition_info = self.staging_partition_info.lock().unwrap();
staging_partition_info
self.manifest_ctx
.staging_partition_info()
.as_ref()
.map(|info| {
matches!(
@@ -873,6 +878,11 @@ pub(crate) struct ManifestContext {
/// The state of the region. The region checks the state before updating
/// manifest.
state: AtomicCell<RegionRoleState>,
/// Partition info of the region in staging mode.
///
/// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
/// so we need to store the partition info separately.
staging_partition_info: Mutex<Option<StagingPartitionInfo>>,
}
impl ManifestContext {
@@ -880,9 +890,46 @@ impl ManifestContext {
ManifestContext {
manifest_manager: tokio::sync::RwLock::new(manager),
state: AtomicCell::new(state),
staging_partition_info: Mutex::new(None),
}
}
pub(crate) fn staging_partition_info(&self) -> Option<StagingPartitionInfo> {
self.staging_partition_info.lock().unwrap().clone()
}
pub(crate) fn set_staging_partition_info(&self, staging_partition_info: StagingPartitionInfo) {
let mut current = self.staging_partition_info.lock().unwrap();
debug_assert!(current.is_none());
*current = Some(staging_partition_info);
}
fn clear_staging_partition_info(&self) {
*self.staging_partition_info.lock().unwrap() = None;
}
pub(crate) fn exit_staging(
&self,
region_id: RegionId,
next_state: RegionRoleState,
) -> Result<()> {
self.state
.compare_exchange(
RegionRoleState::Leader(RegionLeaderState::Staging),
next_state,
)
.map_err(|actual| {
RegionStateSnafu {
region_id,
state: actual,
expect: RegionRoleState::Leader(RegionLeaderState::Staging),
}
.build()
})?;
self.clear_staging_partition_info();
Ok(())
}
pub(crate) async fn manifest_version(&self) -> ManifestVersion {
self.manifest_manager
.read()
@@ -1028,27 +1075,50 @@ impl ManifestContext {
/// Sets the [`RegionRole`].
///
/// ```text
/// +------------------------------------------+
/// | +-----------------+ |
/// | | | |
/// +---+------+ +-------+-----+ +--v-v---+
/// | Follower | | Downgrading | | Leader |
/// +---^-^----+ +-----+-^-----+ +--+-+---+
/// | | | | | |
/// | +------------------+ +-----------------+ |
/// +------------------------------------------+
///
/// Transition:
/// - Follower -> Leader
/// - Downgrading Leader -> Leader
/// - Leader -> Follower
/// - Downgrading Leader -> Follower
/// - Leader -> Downgrading Leader
/// +---------------------+
/// | Staging Leader |
/// +----------+----------+
/// |
/// v
/// +----------+ +------+-------+ +-------------+
/// | Follower | <-> | Leader | <-> | Downgrading |
/// +-----+----+ +------+-------+ +------+------+
/// ^ ^ |
/// +-----------------+--------------------+
///
/// ```
///
/// # State Transitions
///
/// From `Follower`:
/// - `Follower -> Leader`
///
/// From `Leader`:
/// - `Leader -> Follower`
/// - `Leader -> Downgrading Leader`
///
/// From `Staging Leader`:
/// - `Staging Leader -> Leader`
/// - `Staging Leader -> Follower`
/// - `Staging Leader -> Downgrading Leader`
///
/// From `Downgrading Leader`:
/// - `Downgrading Leader -> Leader`
/// - `Downgrading Leader -> Follower`
pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
match next_role {
RegionRole::Follower => {
if self
.exit_staging(region_id, RegionRoleState::Follower)
.is_ok()
{
info!(
"Convert region {} to follower, previous role state: {:?}",
region_id,
RegionRoleState::Leader(RegionLeaderState::Staging)
);
return;
}
match self.state.fetch_update(|state| {
if !matches!(state, RegionRoleState::Follower) {
Some(RegionRoleState::Follower)
@@ -1071,6 +1141,20 @@ impl ManifestContext {
}
}
RegionRole::Leader => {
if self
.exit_staging(
region_id,
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.is_ok()
{
info!(
"Convert region {} to leader, previous role state: {:?}",
region_id,
RegionRoleState::Leader(RegionLeaderState::Staging)
);
return;
}
match self.state.fetch_update(|state| {
if matches!(
state,
@@ -1096,7 +1180,27 @@ impl ManifestContext {
}
}
}
RegionRole::StagingLeader => {
info!(
"Ignore direct conversion of region {} to staging leader; staging requires the dedicated workflow",
region_id
);
}
RegionRole::DowngradingLeader => {
if self
.exit_staging(
region_id,
RegionRoleState::Leader(RegionLeaderState::Downgrading),
)
.is_ok()
{
info!(
"Convert region {} to downgrading region, previous role state: {:?}",
region_id,
RegionRoleState::Leader(RegionLeaderState::Staging)
);
return;
}
match self.state.compare_exchange(
RegionRoleState::Leader(RegionLeaderState::Writable),
RegionRoleState::Leader(RegionLeaderState::Downgrading),
@@ -1438,8 +1542,8 @@ pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<P
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::create_temp_dir;
@@ -1512,7 +1616,6 @@ mod tests {
topic_latest_entry_id: Default::default(),
written_bytes: Arc::new(AtomicU64::new(0)),
stats: ManifestStats::default(),
staging_partition_info: Mutex::new(None),
}
}
@@ -1684,6 +1787,13 @@ mod tests {
RegionRoleState::Leader(RegionLeaderState::Writable)
);
// Direct Leader -> StagingLeader should be ignored.
manifest_ctx.set_role(RegionRole::StagingLeader, region_id);
assert_eq!(
manifest_ctx.state.load(),
RegionRoleState::Leader(RegionLeaderState::Writable)
);
// Leader -> Downgrading Leader
manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
assert_eq!(
@@ -1825,7 +1935,6 @@ mod tests {
topic_latest_entry_id: Default::default(),
written_bytes: Arc::new(AtomicU64::new(0)),
stats: ManifestStats::default(),
staging_partition_info: Mutex::new(None),
};
// Test initial state

View File

@@ -17,7 +17,7 @@
use std::any::TypeId;
use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::sync::{Arc, LazyLock, Mutex};
use std::sync::{Arc, LazyLock};
use std::time::Instant;
use common_telemetry::{debug, error, info, warn};
@@ -349,7 +349,6 @@ impl RegionOpener {
topic_latest_entry_id: AtomicU64::new(0),
written_bytes: Arc::new(AtomicU64::new(0)),
stats: self.stats,
staging_partition_info: Mutex::new(None),
}))
}
@@ -586,8 +585,6 @@ impl RegionOpener {
topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
written_bytes: Arc::new(AtomicU64::new(0)),
stats: self.stats.clone(),
// TODO(weny): reload the staging partition info from the manifest.
staging_partition_info: Mutex::new(None),
};
let region = Arc::new(region);

View File

@@ -75,7 +75,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
}
let staging_partition_info = region.staging_partition_info.lock().unwrap().clone();
let staging_partition_info = region.manifest_ctx.staging_partition_info();
let staging_partition_expr = staging_partition_info
.as_ref()

View File

@@ -42,7 +42,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// If the region is already in staging mode, verify the partition directive matches.
if region.is_staging() {
let staging_partition_info = region.staging_partition_info.lock().unwrap().clone();
let staging_partition_info = region.manifest_ctx.staging_partition_info();
// If the partition directive mismatches, return error.
if staging_partition_info
.as_ref()
@@ -279,10 +279,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region: &MitoRegionRef,
partition_directive: StagingPartitionDirective,
) {
let mut staging_partition_info = region.staging_partition_info.lock().unwrap();
debug_assert!(staging_partition_info.is_none());
*staging_partition_info = Some(StagingPartitionInfo::from_partition_directive(
partition_directive,
));
region.manifest_ctx.set_staging_partition_info(
StagingPartitionInfo::from_partition_directive(partition_directive),
);
}
}

View File

@@ -67,7 +67,7 @@ impl From<SettableRegionRoleState> for RegionRole {
SettableRegionRoleState::Follower => RegionRole::Follower,
SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
SettableRegionRoleState::Leader => RegionRole::Leader,
SettableRegionRoleState::StagingLeader => RegionRole::Leader, // Still a leader role
SettableRegionRoleState::StagingLeader => RegionRole::StagingLeader,
}
}
}
@@ -210,6 +210,11 @@ pub enum RegionRole {
Follower,
// Writable region(mito2), Readonly region(file).
Leader,
// Leader is in staging mode.
//
// This is leader-like and writable, but it follows the staging workflow
// semantics instead of a normal leader's steady state.
StagingLeader,
// Leader is downgrading to follower.
//
// This state is used to prevent new write requests.
@@ -221,6 +226,7 @@ impl Display for RegionRole {
match self {
RegionRole::Follower => write!(f, "Follower"),
RegionRole::Leader => write!(f, "Leader"),
RegionRole::StagingLeader => write!(f, "Leader(Staging)"),
RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
}
}
@@ -228,7 +234,7 @@ impl Display for RegionRole {
impl RegionRole {
pub fn writable(&self) -> bool {
matches!(self, RegionRole::Leader)
matches!(self, RegionRole::Leader | RegionRole::StagingLeader)
}
}
@@ -237,6 +243,7 @@ impl From<RegionRole> for PbRegionRole {
match value {
RegionRole::Follower => PbRegionRole::Follower,
RegionRole::Leader => PbRegionRole::Leader,
RegionRole::StagingLeader => PbRegionRole::StagingLeader,
RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
}
}
@@ -246,6 +253,7 @@ impl From<PbRegionRole> for RegionRole {
fn from(value: PbRegionRole) -> Self {
match value {
PbRegionRole::Leader => RegionRole::Leader,
PbRegionRole::StagingLeader => RegionRole::StagingLeader,
PbRegionRole::Follower => RegionRole::Follower,
PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
}