feat: mito region staging state (#6664)

* fix: not mark all deleted when partial trunc (#6654)

* fix: not mark all deleted when partial trunc&not update manifest when partial file range is empty

Signed-off-by: discord9 <discord9@163.com>

* docs: note

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* some tests and DdlRequest

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* stage transit

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* address CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* correct error type

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
This commit is contained in:
Ruihang Xia
2025-08-11 20:17:47 -07:00
committed by GitHub
parent f159fcf599
commit 25f926ea7d
19 changed files with 1123 additions and 63 deletions

View File

@@ -375,6 +375,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Region not found: {}", region_id))]
RegionNotFound {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("View not found: '{}'", view_name))]
ViewNotFound {
view_name: String,
@@ -1116,7 +1123,9 @@ impl ErrorExt for Error {
FlowRouteNotFound { .. } => StatusCode::Unexpected,
FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
ViewNotFound { .. } | TableNotFound { .. } => StatusCode::TableNotFound,
ViewNotFound { .. } | TableNotFound { .. } | RegionNotFound { .. } => {
StatusCode::TableNotFound
}
ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
SubmitProcedure { source, .. }

View File

@@ -22,8 +22,8 @@ use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::error::{
InvalidMetadataSnafu, MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu,
InvalidMetadataSnafu, MetadataCorruptionSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu,
};
use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
use crate::key::txn_helper::TxnOpGetResponseSet;
@@ -455,6 +455,101 @@ impl TableRouteManager {
.transpose()
}
/// Sets the staging state for a specific region.
///
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
/// - the table does not exist
/// - the region is not found in the table
pub async fn set_region_staging_state(
&self,
region_id: store_api::storage::RegionId,
staging: bool,
) -> Result<()> {
let table_id = region_id.table_id();
// Get current table route with raw bytes for CAS operation
let current_table_route = self
.storage
.get_with_raw_bytes(table_id)
.await?
.context(TableRouteNotFoundSnafu { table_id })?;
// Clone the current route value and update the specific region
let new_table_route = current_table_route.inner.clone();
// Only physical tables have region routes
ensure!(
new_table_route.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: format!("Cannot set staging state for logical table {table_id}"),
}
);
let region_routes = new_table_route.region_routes()?.clone();
let mut updated_routes = region_routes.clone();
// Find and update the specific region
// TODO(ruihang): maybe update them in one transaction
let mut region_found = false;
for route in &mut updated_routes {
if route.region.id == region_id {
if staging {
route.set_leader_staging();
} else {
route.clear_leader_staging();
}
region_found = true;
break;
}
}
ensure!(region_found, RegionNotFoundSnafu { region_id });
// Create new table route with updated region routes
let updated_table_route = new_table_route.update(updated_routes)?;
// Execute atomic update
let (txn, _) =
self.storage
.build_update_txn(table_id, &current_table_route, &updated_table_route)?;
let result = self.storage.kv_backend.txn(txn).await?;
ensure!(
result.succeeded,
MetadataCorruptionSnafu {
err_msg: format!(
"Failed to update staging state for region {}: CAS operation failed",
region_id
),
}
);
Ok(())
}
/// Checks if a specific region is in staging state.
///
/// Returns false if the table/region doesn't exist.
pub async fn is_region_staging(&self, region_id: store_api::storage::RegionId) -> Result<bool> {
let table_id = region_id.table_id();
let table_route = self.storage.get(table_id).await?;
match table_route {
Some(route) if route.is_physical() => {
let region_routes = route.region_routes()?;
for route in region_routes {
if route.region.id == region_id {
return Ok(route.is_leader_staging());
}
}
Ok(false)
}
_ => Ok(false),
}
}
/// Returns low-level APIs.
pub fn table_route_storage(&self) -> &TableRouteStorage {
&self.storage

View File

@@ -280,6 +280,11 @@ pub enum LeaderState {
/// - The [`Region`] was planned to migrate to another [`Peer`].
#[serde(alias = "Downgraded")]
Downgrading,
/// The [`Region`] is in staging mode.
///
/// Disables checkpoint and compaction while maintaining write capability.
/// But data ingested during this period are not visible to the user (hence staging).
Staging,
}
impl RegionRoute {
@@ -294,6 +299,11 @@ impl RegionRoute {
matches!(self.leader_state, Some(LeaderState::Downgrading))
}
/// Returns true if the Leader [`Region`] is in staging mode.
pub fn is_leader_staging(&self) -> bool {
matches!(self.leader_state, Some(LeaderState::Staging))
}
/// Marks the Leader [`Region`] as [`RegionState::Downgrading`].
///
/// We should downgrade a [`Region`] before deactivating it:
@@ -310,6 +320,21 @@ impl RegionRoute {
self.leader_state = Some(LeaderState::Downgrading)
}
/// Sets the Leader [`Region`] to staging mode.
pub fn set_leader_staging(&mut self) {
self.leader_state = Some(LeaderState::Staging);
// Reset leader_down_since as it's specific to downgrading
self.leader_down_since = None;
}
/// Clears the leader staging state, returning to normal leader mode.
pub fn clear_leader_staging(&mut self) {
if self.leader_state == Some(LeaderState::Staging) {
self.leader_state = None;
self.leader_down_since = None;
}
}
/// Returns how long since the leader is in `Downgraded` state.
pub fn leader_down_millis(&self) -> Option<i64> {
self.leader_down_since

View File

@@ -50,6 +50,15 @@ impl HandlerContext {
error: None,
}))
}
Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
error!(err; "Failed to convert region to follower - invalid transition");
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}))
}
Err(err) => {
error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
@@ -117,6 +126,15 @@ impl HandlerContext {
error: None,
}));
}
Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
error!(err; "Failed to convert region to downgrading leader - invalid transition");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
last_entry_id: None,
metadata_last_entry_id: None,
exists: true,
error: Some(format!("{err:?}")),
}));
}
Err(err) => {
error!(err; "Failed to convert region to downgrading leader");
return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {

View File

@@ -336,6 +336,9 @@ impl RegionEngine for MetricEngine {
SetRegionRoleStateResponse::NotFound => {
return Ok(SetRegionRoleStateResponse::NotFound)
}
SetRegionRoleStateResponse::InvalidTransition(error) => {
return Ok(SetRegionRoleStateResponse::InvalidTransition(error))
}
};
let data_result = match self
@@ -348,6 +351,9 @@ impl RegionEngine for MetricEngine {
SetRegionRoleStateResponse::NotFound => {
return Ok(SetRegionRoleStateResponse::NotFound)
}
SetRegionRoleStateResponse::InvalidTransition(error) => {
return Ok(SetRegionRoleStateResponse::InvalidTransition(error))
}
};
Ok(SetRegionRoleStateResponse::success(

View File

@@ -61,7 +61,7 @@ use crate::read::seq_scan::SeqScan;
use crate::read::BoxedBatchReader;
use crate::region::options::MergeMode;
use crate::region::version::VersionControlRef;
use crate::region::ManifestContextRef;
use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
use crate::schedule::remote_job_scheduler::{
CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
@@ -142,6 +142,17 @@ impl CompactionScheduler {
schema_metadata_manager: SchemaMetadataManagerRef,
max_parallelism: usize,
) -> Result<()> {
// skip compaction if region is in staging state
let current_state = manifest_ctx.current_state();
if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
info!(
"Skipping compaction for region {} in staging mode, options: {:?}",
region_id, compact_options
);
waiter.send(Ok(0));
return Ok(());
}
if let Some(status) = self.region_status.get_mut(&region_id) {
match compact_options {
Options::Regular(_) => {
@@ -526,14 +537,12 @@ impl CompactionStatus {
/// Set pending compaction request or replace current value if already exist.
fn set_pending_request(&mut self, pending: PendingCompaction) {
if let Some(mut prev) = self.pending_request.replace(pending) {
if let Some(prev) = self.pending_request.replace(pending) {
debug!(
"Replace pending compaction options with new request {:?} for region: {}",
prev.options, self.region_id
);
if let Some(waiter) = prev.waiter.take_inner() {
waiter.send(ManualCompactionOverrideSnafu.fail());
}
prev.waiter.send(ManualCompactionOverrideSnafu.fail());
}
}
@@ -741,9 +750,12 @@ struct PendingCompaction {
#[cfg(test)]
mod tests {
use api::v1::region::StrictWindow;
use common_datasource::compression::CompressionType;
use tokio::sync::oneshot;
use super::*;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::ManifestContext;
use crate::test_util::mock_schema_metadata_manager;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{apply_edit, VersionControlBuilder};
@@ -1046,4 +1058,59 @@ mod tests {
let status = scheduler.region_status.get(&builder.region_id()).unwrap();
assert!(status.pending_request.is_none());
}
#[tokio::test]
async fn test_compaction_bypass_in_staging_mode() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
// Create version control and manifest context for staging mode
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let region_id = version_control.current().version.metadata.region_id;
// Create staging manifest context using the same pattern as SchedulerEnv
let staging_manifest_ctx = {
let manager = RegionManifestManager::new(
version_control.current().version.metadata.clone(),
RegionManifestOptions {
manifest_dir: "".to_string(),
object_store: env.access_layer.object_store().clone(),
compress_type: CompressionType::Uncompressed,
checkpoint_distance: 10,
},
Default::default(),
Default::default(),
)
.await
.unwrap();
Arc::new(ManifestContext::new(
manager,
RegionRoleState::Leader(RegionLeaderState::Staging),
))
};
let (schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager();
// Test regular compaction bypass in staging mode
let (tx, rx) = oneshot::channel();
scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&staging_manifest_ctx,
schema_metadata_manager,
1,
)
.await
.unwrap();
let result = rx.await.unwrap();
assert_eq!(result.unwrap(), 0); // is there a better way to check this?
assert_eq!(0, scheduler.region_status.len());
}
}

View File

@@ -57,6 +57,8 @@ mod scan_test;
#[cfg(test)]
mod set_role_state_test;
#[cfg(test)]
mod staging_test;
#[cfg(test)]
mod sync_test;
#[cfg(test)]
mod truncate_test;

View File

@@ -25,6 +25,36 @@ use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};
/// Helper function to assert a successful response with expected entry id
fn assert_success_response(response: &SetRegionRoleStateResponse, expected_entry_id: u64) {
match response {
SetRegionRoleStateResponse::Success(SetRegionRoleStateSuccess::Mito { last_entry_id }) => {
assert_eq!(*last_entry_id, expected_entry_id);
}
_ => panic!("Expected success response, got: {:?}", response),
}
}
/// Helper function to assert a NotFound response
fn assert_not_found_response(response: &SetRegionRoleStateResponse) {
match response {
SetRegionRoleStateResponse::NotFound => {
// Expected - do nothing
}
_ => panic!("Expected NotFound response, got: {:?}", response),
}
}
/// Helper function to assert an InvalidTransition response
fn assert_invalid_transition_response(response: &SetRegionRoleStateResponse) {
match response {
SetRegionRoleStateResponse::InvalidTransition(_) => {
// Expected - do nothing
}
_ => panic!("Expected InvalidTransition response, got: {:?}", response),
}
}
#[tokio::test]
async fn test_set_role_state_gracefully() {
let settable_role_states = [
@@ -48,20 +78,14 @@ async fn test_set_role_state_gracefully() {
.set_region_role_state_gracefully(region_id, settable_role_state)
.await
.unwrap();
assert_eq!(
SetRegionRoleStateResponse::success(SetRegionRoleStateSuccess::mito(0)),
result
);
assert_success_response(&result, 0);
// set Follower again.
let result = engine
.set_region_role_state_gracefully(region_id, settable_role_state)
.await
.unwrap();
assert_eq!(
SetRegionRoleStateResponse::success(SetRegionRoleStateSuccess::mito(0)),
result
);
assert_success_response(&result, 0);
let rows = Rows {
schema: column_schemas,
@@ -92,10 +116,7 @@ async fn test_set_role_state_gracefully() {
.await
.unwrap();
assert_eq!(
SetRegionRoleStateResponse::success(SetRegionRoleStateSuccess::mito(1)),
result
);
assert_success_response(&result, 1);
}
}
@@ -111,7 +132,7 @@ async fn test_set_role_state_gracefully_not_exist() {
.set_region_role_state_gracefully(non_exist_region_id, SettableRegionRoleState::Follower)
.await
.unwrap();
assert_eq!(SetRegionRoleStateResponse::NotFound, result);
assert_not_found_response(&result);
}
#[tokio::test]
@@ -138,10 +159,7 @@ async fn test_write_downgrading_region() {
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::DowngradingLeader)
.await
.unwrap();
assert_eq!(
SetRegionRoleStateResponse::success(SetRegionRoleStateSuccess::mito(1)),
result
);
assert_success_response(&result, 1);
let rows = Rows {
schema: column_schemas,
@@ -159,3 +177,185 @@ async fn test_write_downgrading_region() {
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RegionNotReady)
}
#[tokio::test]
async fn test_unified_state_transitions() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Test all transitions from normal leader state
// Leader -> StagingLeader -> Leader
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
assert_success_response(&result, 0);
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
.await
.unwrap();
assert_success_response(&result, 0);
// Leader -> StagingLeader -> Follower (exit staging via demotion)
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
.await
.unwrap();
assert_success_response(&result, 0);
// Note: Direct Follower -> Leader promotion is no longer allowed
// Use existing set_region_role method for follower -> leader promotion
engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();
// Leader -> StagingLeader -> DowngradingLeader (exit staging via downgrade)
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::DowngradingLeader)
.await
.unwrap();
assert_success_response(&result, 0);
// Note: Direct DowngradingLeader -> Leader is no longer allowed
// Use existing set_region_role method for downgrading -> leader promotion
engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();
// Test idempotent operations (no-op cases)
// Leader -> Leader (should be no-op)
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
.await
.unwrap();
assert_success_response(&result, 0);
// StagingLeader -> StagingLeader (should be no-op)
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
assert_success_response(&result, 0);
// Back to follower for final test
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
.await
.unwrap();
// Follower -> Follower (should be no-op)
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
.await
.unwrap();
assert_success_response(&result, 0);
}
#[tokio::test]
async fn test_restricted_state_transitions() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Test that Leader transition from follower is rejected with InvalidTransition
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
.await
.unwrap();
assert_invalid_transition_response(&result);
// Test that Leader transition from downgrading is rejected with InvalidTransition
engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();
engine
.set_region_role(region_id, RegionRole::DowngradingLeader)
.unwrap();
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
.await
.unwrap();
assert_invalid_transition_response(&result);
// Test that StagingLeader transition from follower is rejected with InvalidTransition
engine
.set_region_role(region_id, RegionRole::Follower)
.unwrap();
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
assert_invalid_transition_response(&result);
// Test that StagingLeader transition from downgrading is rejected with InvalidTransition
engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();
engine
.set_region_role(region_id, RegionRole::DowngradingLeader)
.unwrap();
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
assert_invalid_transition_response(&result);
// Test that valid staging workflow still works
engine
.set_region_role(region_id, RegionRole::Leader)
.unwrap();
// Writable Leader -> StagingLeader should work
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::StagingLeader)
.await
.unwrap();
assert_success_response(&result, 0);
// Staging -> Leader should work
let result = engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
.await
.unwrap();
assert_success_response(&result, 0);
}

View File

@@ -0,0 +1,180 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Integration tests for staging state functionality.
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionAlterRequest, RegionRequest, RegionTruncateRequest};
use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::request::WorkerRequest;
use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_staging_state_integration() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
// Create region
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Test external API patterns work correctly
use store_api::region_engine::SettableRegionRoleState;
let (role_req, _receiver) = WorkerRequest::new_set_readonly_gracefully(
region_id,
SettableRegionRoleState::StagingLeader,
);
match role_req {
WorkerRequest::SetRegionRoleStateGracefully {
region_id: req_region_id,
region_role_state,
..
} => {
assert_eq!(req_region_id, region_id);
assert_eq!(region_role_state, SettableRegionRoleState::StagingLeader);
}
_ => panic!("Expected SetRegionRoleStateGracefully request"),
}
let (role_req, _receiver) =
WorkerRequest::new_set_readonly_gracefully(region_id, SettableRegionRoleState::Leader);
match role_req {
WorkerRequest::SetRegionRoleStateGracefully {
region_id: req_region_id,
region_role_state,
..
} => {
assert_eq!(req_region_id, region_id);
assert_eq!(region_role_state, SettableRegionRoleState::Leader);
}
_ => panic!("Expected SetRegionRoleStateGracefully request"),
}
}
#[tokio::test]
async fn test_staging_blocks_alter_operations() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
// Create region
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Note: In the current implementation, we can't directly test staging mode
// through the engine interface since staging transitions are handled
// through worker requests. This test demonstrates the pattern that would
// be used once external control interfaces are implemented.
// Test that ALTER operations would be blocked in staging mode
let alter_request = RegionAlterRequest {
kind: store_api::region_request::AlterKind::AddColumns { columns: vec![] },
};
// This currently succeeds since we're not in staging mode
let result = engine
.handle_request(region_id, RegionRequest::Alter(alter_request))
.await;
assert!(result.is_ok(), "ALTER should succeed in normal mode");
}
#[tokio::test]
async fn test_staging_blocks_truncate_operations() {
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
// Create region
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Test that TRUNCATE operations would be blocked in staging mode
let truncate_request = RegionTruncateRequest::All;
// This currently succeeds since we're not in staging mode
let result = engine
.handle_request(region_id, RegionRequest::Truncate(truncate_request))
.await;
assert!(result.is_ok(), "TRUNCATE should succeed in normal mode");
}
#[tokio::test]
async fn test_staging_state_validation_patterns() {
use crate::region::{RegionLeaderState, RegionRoleState};
// Test the state validation patterns used throughout the codebase
let staging_state = RegionRoleState::Leader(RegionLeaderState::Staging);
let writable_state = RegionRoleState::Leader(RegionLeaderState::Writable);
// Test staging detection
let is_staging = staging_state == RegionRoleState::Leader(RegionLeaderState::Staging);
assert!(is_staging, "Should correctly identify staging state");
let is_not_staging = writable_state == RegionRoleState::Leader(RegionLeaderState::Staging);
assert!(
!is_not_staging,
"Should correctly identify non-staging state"
);
// Test writable state check
let staging_is_writable = matches!(
staging_state,
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging)
);
assert!(staging_is_writable, "Staging regions should be writable");
let writable_is_writable = matches!(
writable_state,
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging)
);
assert!(writable_is_writable, "Writable regions should be writable");
// Test flushable state check
let staging_is_flushable = matches!(
staging_state,
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging)
| RegionRoleState::Leader(RegionLeaderState::Downgrading)
);
assert!(staging_is_flushable, "Staging regions should be flushable");
let writable_is_flushable = matches!(
writable_state,
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging)
| RegionRoleState::Leader(RegionLeaderState::Downgrading)
);
assert!(
writable_is_flushable,
"Writable regions should be flushable"
);
}

View File

@@ -24,6 +24,7 @@ use crate::manifest::action::{RegionCheckpoint, RegionManifest};
use crate::manifest::manager::RegionManifestOptions;
use crate::manifest::storage::ManifestObjectStore;
use crate::metrics::MANIFEST_OP_ELAPSED;
use crate::region::{RegionLeaderState, RegionRoleState};
/// [`Checkpointer`] is responsible for doing checkpoint for a region, in an asynchronous way.
#[derive(Debug)]
@@ -120,7 +121,20 @@ impl Checkpointer {
/// Check if it's needed to do checkpoint for the region by the checkpoint distance.
/// If needed, and there's no currently running checkpoint task, it will start a new checkpoint
/// task running in the background.
pub(crate) fn maybe_do_checkpoint(&self, manifest: &RegionManifest) {
pub(crate) fn maybe_do_checkpoint(
&self,
manifest: &RegionManifest,
region_state: RegionRoleState,
) {
// Skip checkpoint if region is in staging state
if region_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
info!(
"Skipping checkpoint for region {} in staging mode, manifest version: {}",
manifest.metadata.region_id, manifest.manifest_version
);
return;
}
if self.manifest_options.checkpoint_distance == 0 {
return;
}

View File

@@ -35,6 +35,9 @@ use crate::manifest::storage::{
file_version, is_checkpoint_file, is_delta_file, ManifestObjectStore,
};
use crate::metrics::MANIFEST_OP_ELAPSED;
#[cfg(test)]
use crate::region::RegionLeaderState;
use crate::region::RegionRoleState;
/// Options for [RegionManifestManager].
#[derive(Debug, Clone)]
@@ -434,7 +437,11 @@ impl RegionManifestManager {
}
/// Updates the manifest. Returns the current manifest version number.
pub async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
pub async fn update(
&mut self,
action_list: RegionMetaActionList,
region_state: RegionRoleState,
) -> Result<ManifestVersion> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["update"])
.start_timer();
@@ -474,7 +481,7 @@ impl RegionManifestManager {
self.manifest = Arc::new(new_manifest);
self.checkpointer
.maybe_do_checkpoint(self.manifest.as_ref());
.maybe_do_checkpoint(self.manifest.as_ref(), region_state);
Ok(version)
}
@@ -657,7 +664,13 @@ mod test {
metadata: new_metadata.clone(),
}));
let current_version = manager.update(action_list).await.unwrap();
let current_version = manager
.update(
action_list,
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
assert_eq!(current_version, 1);
manager.validate_manifest(&new_metadata, 1);
@@ -719,7 +732,13 @@ mod test {
metadata: new_metadata.clone(),
}));
let current_version = manager.update(action_list).await.unwrap();
let current_version = manager
.update(
action_list,
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
assert_eq!(current_version, 1);
manager.validate_manifest(&new_metadata, 1);
@@ -730,15 +749,16 @@ mod test {
// update 10 times nop_action to trigger checkpoint
for _ in 0..10 {
manager
.update(RegionMetaActionList::new(vec![RegionMetaAction::Edit(
RegionEdit {
.update(
RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
},
)]))
})]),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
}

View File

@@ -26,6 +26,7 @@ use crate::manifest::action::{
};
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::tests::utils::basic_region_metadata;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::TestEnv;
@@ -83,7 +84,13 @@ async fn manager_without_checkpoint() {
// apply 10 actions
for _ in 0..10 {
manager.update(nop_action()).await.unwrap();
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
}
// no checkpoint
@@ -126,7 +133,13 @@ async fn manager_with_checkpoint_distance_1() {
// apply 10 actions
for _ in 0..10 {
manager.update(nop_action()).await.unwrap();
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -183,7 +196,13 @@ async fn test_corrupted_data_causing_checksum_error() {
// Apply actions
for _ in 0..10 {
manager.update(nop_action()).await.unwrap();
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
}
// Wait for the checkpoint to finish.
@@ -266,7 +285,10 @@ async fn generate_checkpoint_with_compression_types(
let (_env, mut manager) = build_manager(1, compress_type).await;
for action in actions {
manager.update(action).await.unwrap();
manager
.update(action, RegionRoleState::Leader(RegionLeaderState::Writable))
.await
.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -316,7 +338,10 @@ async fn manifest_install_manifest_to() {
let (env, mut manager) = build_manager(0, CompressionType::Uncompressed).await;
let (files, actions) = generate_action_lists(10);
for action in actions {
manager.update(action).await.unwrap();
manager
.update(action, RegionRoleState::Leader(RegionLeaderState::Writable))
.await
.unwrap();
}
// Nothing to install
@@ -354,7 +379,10 @@ async fn manifest_install_manifest_to_with_checkpoint() {
let (env, mut manager) = build_manager(3, CompressionType::Uncompressed).await;
let (files, actions) = generate_action_lists(10);
for action in actions {
manager.update(action).await.unwrap();
manager
.update(action, RegionRoleState::Leader(RegionLeaderState::Writable))
.await
.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -416,3 +444,55 @@ async fn manifest_install_manifest_to_with_checkpoint() {
}
assert!(another_manager.store().total_manifest_size() > 4000);
}
#[tokio::test]
async fn test_checkpoint_bypass_in_staging_mode() {
common_telemetry::init_default_ut_logging();
let (_env, mut manager) = build_manager(1, CompressionType::Uncompressed).await;
// Apply actions in staging mode - checkpoint should be bypassed
for _ in 0..15 {
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Staging),
)
.await
.unwrap();
}
assert!(!manager.checkpointer().is_doing_checkpoint());
// Verify no checkpoint was created in staging mode
assert!(manager
.store()
.load_last_checkpoint()
.await
.unwrap()
.is_none());
// Now switch to normal mode and apply one more action
manager
.update(
nop_action(),
RegionRoleState::Leader(RegionLeaderState::Writable),
)
.await
.unwrap();
// Wait for potential checkpoint
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
// Now checkpoint should exist because we switched to writable mode
let checkpoint_result = manager.store().load_last_checkpoint().await.unwrap();
assert!(
checkpoint_result.is_some(),
"Checkpoint should exist after switching to writable mode"
);
let (last_version, _checkpoint_data) = checkpoint_result.unwrap();
// Checkpoint should include all 16 actions (15 from staging + 1 from writable)
assert_eq!(last_version, 16);
}

View File

@@ -70,6 +70,8 @@ impl RegionUsage {
pub enum RegionLeaderState {
/// The region is opened and is writable.
Writable,
/// The region is in staging mode - writable but no checkpoint/compaction.
Staging,
/// The region is altering.
Altering,
/// The region is dropping.
@@ -200,7 +202,11 @@ impl MitoRegion {
/// Returns whether the region is writable.
pub(crate) fn is_writable(&self) -> bool {
self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Writable)
matches!(
self.manifest_ctx.state.load(),
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging)
)
}
/// Returns whether the region is flushable.
@@ -208,6 +214,7 @@ impl MitoRegion {
matches!(
self.manifest_ctx.state.load(),
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging)
| RegionRoleState::Leader(RegionLeaderState::Downgrading)
)
}
@@ -220,6 +227,12 @@ impl MitoRegion {
)
}
/// Returns whether the region is in staging mode.
#[allow(dead_code)]
pub(crate) fn is_staging(&self) -> bool {
self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
}
pub fn region_id(&self) -> RegionId {
self.region_id
}
@@ -279,12 +292,134 @@ impl MitoRegion {
)
}
/// Sets the region to readonly gracefully. This acquires the manifest write lock.
pub(crate) async fn set_role_state_gracefully(&self, state: SettableRegionRoleState) {
/// Sets the staging state.
/// You should call this method in the worker loop.
/// Transitions from Writable to Staging state.
pub(crate) fn set_staging(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Writable,
RegionRoleState::Leader(RegionLeaderState::Staging),
)
}
/// Exits the staging state back to writable.
/// You should call this method in the worker loop.
/// Transitions from Staging to Writable state.
pub(crate) fn exit_staging(&self) -> Result<()> {
self.compare_exchange_state(
RegionLeaderState::Staging,
RegionRoleState::Leader(RegionLeaderState::Writable),
)
}
/// Sets the region role state gracefully. This acquires the manifest write lock.
pub(crate) async fn set_role_state_gracefully(
&self,
state: SettableRegionRoleState,
) -> Result<()> {
let _manager = self.manifest_ctx.manifest_manager.write().await;
// We acquires the write lock of the manifest manager to ensure that no one is updating the manifest.
// Then we change the state.
self.set_role(state.into());
let current_state = self.state();
match state {
SettableRegionRoleState::Leader => {
// Exit staging mode and return to normal writable leader
// Only allowed from staging state
match current_state {
RegionRoleState::Leader(RegionLeaderState::Staging) => {
info!("Exiting staging mode for region {}", self.region_id);
self.exit_staging()?;
}
RegionRoleState::Leader(RegionLeaderState::Writable) => {
// Already in desired state - no-op
info!("Region {} already in normal leader mode", self.region_id);
}
_ => {
// Only staging -> leader transition is allowed
return Err(RegionStateSnafu {
region_id: self.region_id,
state: current_state,
expect: RegionRoleState::Leader(RegionLeaderState::Staging),
}
.build());
}
}
}
SettableRegionRoleState::StagingLeader => {
// Enter staging mode from normal writable leader
// Only allowed from writable leader state
match current_state {
RegionRoleState::Leader(RegionLeaderState::Writable) => {
info!("Entering staging mode for region {}", self.region_id);
self.set_staging()?;
}
RegionRoleState::Leader(RegionLeaderState::Staging) => {
// Already in desired state - no-op
info!("Region {} already in staging mode", self.region_id);
}
_ => {
return Err(RegionStateSnafu {
region_id: self.region_id,
state: current_state,
expect: RegionRoleState::Leader(RegionLeaderState::Writable),
}
.build());
}
}
}
SettableRegionRoleState::Follower => {
// Make this region a follower
match current_state {
RegionRoleState::Leader(RegionLeaderState::Staging) => {
info!(
"Exiting staging and demoting region {} to follower",
self.region_id
);
self.exit_staging()?;
self.set_role(RegionRole::Follower);
}
RegionRoleState::Leader(_) => {
info!("Demoting region {} from leader to follower", self.region_id);
self.set_role(RegionRole::Follower);
}
RegionRoleState::Follower => {
// Already in desired state - no-op
info!("Region {} already in follower mode", self.region_id);
}
}
}
SettableRegionRoleState::DowngradingLeader => {
// downgrade this region to downgrading leader
match current_state {
RegionRoleState::Leader(RegionLeaderState::Staging) => {
info!(
"Exiting staging and entering downgrade for region {}",
self.region_id
);
self.exit_staging()?;
self.set_role(RegionRole::DowngradingLeader);
}
RegionRoleState::Leader(RegionLeaderState::Writable) => {
info!("Starting downgrade for region {}", self.region_id);
self.set_role(RegionRole::DowngradingLeader);
}
RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
// Already in desired state - no-op
info!("Region {} already in downgrading mode", self.region_id);
}
_ => {
warn!(
"Cannot start downgrade for region {} from state {:?}",
self.region_id, current_state
);
}
}
}
}
Ok(())
}
/// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
@@ -390,6 +525,11 @@ impl ManifestContext {
self.manifest_manager.read().await.has_update().await
}
/// Returns the current region role state.
pub(crate) fn current_state(&self) -> RegionRoleState {
self.state.load()
}
/// Installs the manifest changes from the current version to the target version (inclusive).
///
/// Returns installed [RegionManifest].
@@ -485,7 +625,7 @@ impl ManifestContext {
}
// Now we can update the manifest.
let version = manager.update(action_list).await.inspect_err(
let version = manager.update(action_list, current_state).await.inspect_err(
|e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
)?;
@@ -701,6 +841,22 @@ impl RegionMap {
}
}
/// Gets writable non-staging region by region id.
///
/// Returns error if the region does not exist, is readonly, or is in staging mode.
pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
let region = self.writable_region(region_id)?;
if region.is_staging() {
return Err(crate::error::RegionStateSnafu {
region_id,
state: region.state(),
expect: RegionRoleState::Leader(RegionLeaderState::Writable),
}
.build());
}
Ok(region)
}
/// Gets flushable region by region id.
///
/// Returns error if the region does not exist or is not operable.
@@ -830,13 +986,27 @@ impl ManifestStats {
mod tests {
use std::sync::Arc;
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::create_temp_dir;
use crossbeam_utils::atomic::AtomicCell;
use object_store::services::Fs;
use object_store::ObjectStore;
use store_api::logstore::provider::Provider;
use store_api::region_engine::RegionRole;
use store_api::region_request::PathType;
use store_api::storage::RegionId;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::access_layer::AccessLayer;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::{
ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::test_util::memtable_util::EmptyMemtableBuilder;
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::version_util::VersionControlBuilder;
use crate::time_provider::StdTimeProvider;
#[test]
fn test_region_state_lock_free() {
@@ -894,4 +1064,140 @@ mod tests {
RegionRoleState::Leader(RegionLeaderState::Writable)
);
}
#[tokio::test]
async fn test_staging_state_validation() {
let env = SchedulerEnv::new().await;
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
// Create context with staging state using the correct pattern from SchedulerEnv
let staging_ctx = {
let manager = RegionManifestManager::new(
version_control.current().version.metadata.clone(),
RegionManifestOptions {
manifest_dir: "".to_string(),
object_store: env.access_layer.object_store().clone(),
compress_type: CompressionType::Uncompressed,
checkpoint_distance: 10,
},
Default::default(),
Default::default(),
)
.await
.unwrap();
Arc::new(ManifestContext::new(
manager,
RegionRoleState::Leader(RegionLeaderState::Staging),
))
};
// Test staging state behavior
assert_eq!(
staging_ctx.current_state(),
RegionRoleState::Leader(RegionLeaderState::Staging)
);
// Test writable context for comparison
let writable_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;
assert_eq!(
writable_ctx.current_state(),
RegionRoleState::Leader(RegionLeaderState::Writable)
);
}
#[tokio::test]
async fn test_staging_state_transitions() {
let builder = VersionControlBuilder::new();
let version_control = Arc::new(builder.build());
let metadata = version_control.current().version.metadata.clone();
// Create MitoRegion for testing state transitions
let temp_dir = create_temp_dir("");
let path_str = temp_dir.path().display().to_string();
let fs_builder = Fs::default().root(&path_str);
let object_store = ObjectStore::new(fs_builder).unwrap().finish();
let index_aux_path = temp_dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let access_layer = Arc::new(AccessLayer::new(
"",
PathType::Bare,
object_store,
puffin_mgr,
intm_mgr,
));
let manager = RegionManifestManager::new(
metadata.clone(),
RegionManifestOptions {
manifest_dir: "".to_string(),
object_store: access_layer.object_store().clone(),
compress_type: CompressionType::Uncompressed,
checkpoint_distance: 10,
},
Default::default(),
Default::default(),
)
.await
.unwrap();
let manifest_ctx = Arc::new(ManifestContext::new(
manager,
RegionRoleState::Leader(RegionLeaderState::Writable),
));
let region = MitoRegion {
region_id: metadata.region_id,
version_control,
access_layer,
manifest_ctx,
file_purger: crate::test_util::new_noop_file_purger(),
provider: Provider::noop_provider(),
last_flush_millis: Default::default(),
last_compaction_millis: Default::default(),
time_provider: Arc::new(StdTimeProvider),
topic_latest_entry_id: Default::default(),
memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
stats: ManifestStats::default(),
};
// Test initial state
assert_eq!(
region.state(),
RegionRoleState::Leader(RegionLeaderState::Writable)
);
assert!(!region.is_staging());
// Test transition to staging
region.set_staging().unwrap();
assert_eq!(
region.state(),
RegionRoleState::Leader(RegionLeaderState::Staging)
);
assert!(region.is_staging());
// Test transition back to writable
region.exit_staging().unwrap();
assert_eq!(
region.state(),
RegionRoleState::Leader(RegionLeaderState::Writable)
);
assert!(!region.is_staging());
// Test invalid transitions
assert!(region.set_staging().is_ok()); // Writable -> Staging should work
assert!(region.set_staging().is_err()); // Staging -> Staging should fail
assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
}
}

View File

@@ -203,8 +203,9 @@ impl RegionOpener {
&expect.column_metadatas,
&expect.primary_key,
)?;
// To keep consistence with Create behavior, set the opened Region to RegionRole::Leader.
// To keep consistency with Create behavior, set the opened Region to RegionRole::Leader.
region.set_role(RegionRole::Leader);
return Ok(region);
}
Ok(None) => {

View File

@@ -34,6 +34,7 @@ use std::sync::Arc;
use std::time::Duration;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
@@ -1003,12 +1004,20 @@ impl<S: LogStore> RegionWorkerLoop<S> {
if let Some(region) = self.regions.get_region(region_id) {
// We need to do this in background as we need the manifest lock.
common_runtime::spawn_global(async move {
region.set_role_state_gracefully(region_role_state).await;
let last_entry_id = region.version_control.current().last_entry_id;
let _ = sender.send(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::mito(last_entry_id),
));
match region.set_role_state_gracefully(region_role_state).await {
Ok(()) => {
let last_entry_id = region.version_control.current().last_entry_id;
let _ = sender.send(SetRegionRoleStateResponse::success(
SetRegionRoleStateSuccess::mito(last_entry_id),
));
}
Err(e) => {
error!(e; "Failed to set region {} role state to {:?}", region_id, region_role_state);
let _ = sender.send(SetRegionRoleStateResponse::invalid_transition(
BoxedError::new(e),
));
}
}
});
} else {
let _ = sender.send(SetRegionRoleStateResponse::NotFound);

View File

@@ -45,10 +45,14 @@ impl<S> RegionWorkerLoop<S> {
&mut self,
region_id: RegionId,
request: RegionAlterRequest,
mut sender: OptionOutputTx,
sender: OptionOutputTx,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
let region = match self.regions.writable_non_staging_region(region_id) {
Ok(region) => region,
Err(e) => {
sender.send(Err(e));
return;
}
};
info!("Try to alter region: {}, request: {:?}", region_id, request);

View File

@@ -42,7 +42,7 @@ where
&mut self,
region_id: RegionId,
) -> Result<AffectedRows> {
let region = self.regions.writable_region(region_id)?;
let region = self.regions.writable_non_staging_region(region_id)?;
info!("Try to drop region: {}, worker: {}", region_id, self.id);

View File

@@ -30,11 +30,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
&mut self,
region_id: RegionId,
req: RegionTruncateRequest,
mut sender: OptionOutputTx,
sender: OptionOutputTx,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
let region = match self.regions.writable_non_staging_region(region_id) {
Ok(region) => region,
Err(e) => {
sender.send(Err(e));
return;
}
};
let version_data = region.version_control.current();
match req {

View File

@@ -44,6 +44,10 @@ use crate::storage::{RegionId, ScanRequest, SequenceNumber};
pub enum SettableRegionRoleState {
Follower,
DowngradingLeader,
/// Exit staging mode and return to normal leader state. Only allowed from staging state.
Leader,
/// Enter staging mode. Region remains writable but disables checkpoint and compaction. Only allowed from normal leader state.
StagingLeader,
}
impl Display for SettableRegionRoleState {
@@ -51,6 +55,8 @@ impl Display for SettableRegionRoleState {
match self {
SettableRegionRoleState::Follower => write!(f, "Follower"),
SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
SettableRegionRoleState::Leader => write!(f, "Leader"),
SettableRegionRoleState::StagingLeader => write!(f, "Leader(Staging)"),
}
}
}
@@ -60,6 +66,8 @@ impl From<SettableRegionRoleState> for RegionRole {
match value {
SettableRegionRoleState::Follower => RegionRole::Follower,
SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
SettableRegionRoleState::Leader => RegionRole::Leader,
SettableRegionRoleState::StagingLeader => RegionRole::Leader, // Still a leader role
}
}
}
@@ -128,10 +136,11 @@ impl SetRegionRoleStateSuccess {
}
/// The response of setting region role state.
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug)]
pub enum SetRegionRoleStateResponse {
Success(SetRegionRoleStateSuccess),
NotFound,
InvalidTransition(BoxedError),
}
impl SetRegionRoleStateResponse {
@@ -140,10 +149,20 @@ impl SetRegionRoleStateResponse {
Self::Success(success)
}
/// Returns a [SetRegionRoleStateResponse::InvalidTransition] with the error.
pub fn invalid_transition(error: BoxedError) -> Self {
Self::InvalidTransition(error)
}
/// Returns true if the response is a [SetRegionRoleStateResponse::NotFound].
pub fn is_not_found(&self) -> bool {
matches!(self, SetRegionRoleStateResponse::NotFound)
}
/// Returns true if the response is a [SetRegionRoleStateResponse::InvalidTransition].
pub fn is_invalid_transition(&self) -> bool {
matches!(self, SetRegionRoleStateResponse::InvalidTransition(_))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]