fix: ensure table route metadata is eventually rolled back on failure (#5174)

* fix: ensure table route metadata is eventually rolled back on procedure failure

* fix(fuzz): enhance procedure condition checking

* chore: add logs

* feat: close downgraded leader region actively

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-12-19 11:29:34 +08:00
committed by GitHub
parent c9ad8c7101
commit 66f0581f5b
10 changed files with 274 additions and 71 deletions

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod close_downgraded_region;
pub(crate) mod downgrade_leader_region;
pub(crate) mod manager;
pub(crate) mod migration_abort;
@@ -43,6 +44,7 @@ use common_procedure::error::{
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey};
use common_telemetry::info;
use manager::RegionMigrationProcedureGuard;
pub use manager::{
RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
@@ -91,7 +93,9 @@ impl PersistentContext {
let lock_key = vec![
CatalogLock::Read(&self.catalog).into(),
SchemaLock::read(&self.catalog, &self.schema).into(),
TableLock::Read(region_id.table_id()).into(),
// The optimistic updating of table route is not working very well,
// so we need to use the write lock here.
TableLock::Write(region_id.table_id()).into(),
RegionLock::Write(region_id).into(),
];
@@ -253,7 +257,7 @@ impl Context {
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableRoute: {table_id}"),
})?
.context(error::TableRouteNotFoundSnafu { table_id })?;
@@ -317,7 +321,7 @@ impl Context {
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableInfo: {table_id}"),
})?
.context(error::TableInfoNotFoundSnafu { table_id })?;
@@ -350,7 +354,7 @@ impl Context {
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.context(error::RetryLaterWithSourceSnafu {
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"),
})?
.context(error::DatanodeTableNotFoundSnafu {
@@ -468,6 +472,48 @@ impl RegionMigrationProcedure {
_guard: guard,
})
}
async fn rollback_inner(&mut self) -> Result<()> {
let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
.with_label_values(&["rollback"])
.start_timer();
let table_id = self.context.region_id().table_id();
let region_id = self.context.region_id();
self.context.remove_table_route_value();
let table_metadata_manager = self.context.table_metadata_manager.clone();
let table_route = self.context.get_table_route_value().await?;
// Safety: It must be a physical table route.
let downgraded = table_route
.region_routes()
.unwrap()
.iter()
.filter(|route| route.region.id == region_id)
.any(|route| route.is_leader_downgrading());
if downgraded {
info!("Rollbacking downgraded region leader table route, region: {region_id}");
table_metadata_manager
.update_leader_region_status(table_id, table_route, |route| {
if route.region.id == region_id {
Some(None)
} else {
None
}
})
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
})?;
}
self.context.register_failure_detectors().await;
Ok(())
}
}
#[async_trait::async_trait]
@@ -476,6 +522,16 @@ impl Procedure for RegionMigrationProcedure {
Self::TYPE_NAME
}
async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
self.rollback_inner()
.await
.map_err(ProcedureError::external)
}
fn rollback_supported(&self) -> bool {
true
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
@@ -701,6 +757,12 @@ mod tests {
Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
),
// UpdateMetadata::Upgrade
Step::next(
"Should be the close downgraded region",
None,
Assertion::simple(assert_close_downgraded_region, assert_no_persist),
),
// CloseDowngradedRegion
Step::next(
"Should be the region migration end",
None,
@@ -1071,6 +1133,12 @@ mod tests {
Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
),
// UpdateMetadata::Upgrade
Step::next(
"Should be the close downgraded region",
None,
Assertion::simple(assert_close_downgraded_region, assert_no_persist),
),
// CloseDowngradedRegion
Step::next(
"Should be the region migration end",
None,

View File

@@ -0,0 +1,138 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::MAILBOX_RTT_SECS;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
use common_procedure::Status;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(MAILBOX_RTT_SECS);
#[derive(Debug, Serialize, Deserialize)]
pub struct CloseDowngradedRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for CloseDowngradedRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
if let Err(err) = self.close_downgraded_leader_region(ctx).await {
let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
let region_id = ctx.region_id();
warn!(err; "Failed to close downgraded leader region: {region_id} on datanode {:?}", downgrade_leader_datanode);
}
Ok((Box::new(RegionMigrationEnd), Status::done()))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl CloseDowngradedRegion {
/// Builds close region instruction.
///
/// Abort(non-retry):
/// - Datanode Table is not found.
async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
let pc = &ctx.persistent_ctx;
let downgrade_leader_datanode_id = pc.from_peer.id;
let cluster_id = pc.cluster_id;
let table_id = pc.region_id.table_id();
let region_number = pc.region_id.region_number();
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
let RegionInfo { engine, .. } = datanode_table_value.region_info.clone();
Ok(Instruction::CloseRegion(RegionIdent {
cluster_id,
datanode_id: downgrade_leader_datanode_id,
table_id,
region_number,
engine,
}))
}
/// Closes the downgraded leader region.
async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> {
let close_instruction = self.build_close_region_instruction(ctx).await?;
let region_id = ctx.region_id();
let pc = &ctx.persistent_ctx;
let downgrade_leader_datanode = &pc.from_peer;
let msg = MailboxMessage::json_message(
&format!("Close downgraded region: {}", region_id),
&format!("Meta@{}", ctx.server_addr()),
&format!(
"Datanode-{}@{}",
downgrade_leader_datanode.id, downgrade_leader_datanode.addr
),
common_time::util::current_time_millis(),
&close_instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: close_instruction.to_string(),
})?;
let ch = Channel::Datanode(downgrade_leader_datanode.id);
let receiver = ctx
.mailbox
.send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
.await?;
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received close downgraded leade region reply: {:?}, region: {}",
reply, region_id
);
let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect close region reply",
}
.fail();
};
if result {
Ok(())
} else {
error::UnexpectedSnafu {
violated: format!(
"Failed to close downgraded leader region: {region_id} on datanode {:?}, error: {error:?}",
downgrade_leader_datanode,
),
}
.fail()
}
}
Err(e) => Err(e),
}
}
}

View File

@@ -21,11 +21,11 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use super::open_candidate_region::OpenCandidateRegion;
use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
/// The behaviors:

View File

@@ -25,9 +25,9 @@ use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
@@ -145,7 +145,10 @@ impl OpenCandidateRegion {
match receiver.await? {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!("Received open region reply: {:?}", reply);
info!(
"Received open region reply: {:?}, region: {}",
reply, region_id
);
let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),

View File

@@ -44,19 +44,21 @@ use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use tokio::sync::mpsc::{Receiver, Sender};
use super::manager::RegionMigrationProcedureTracker;
use super::migration_abort::RegionMigrationAbort;
use super::upgrade_candidate_region::UpgradeCandidateRegion;
use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext};
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::error::{self, Error, Result};
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::metasrv::MetasrvInfo;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::manager::RegionMigrationProcedureTracker;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::PersistentContext;
use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
use crate::procedure::region_migration::{
Context, ContextFactory, DefaultContextFactory, PersistentContext, State, VolatileContext,
};
use crate::service::mailbox::{Channel, MailboxRef};
pub type MockHeartbeatReceiver = Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>;
@@ -569,6 +571,14 @@ pub(crate) fn assert_region_migration_end(next: &dyn State) {
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
}
/// Asserts the [State] should be [CloseDowngradedRegion].
pub(crate) fn assert_close_downgraded_region(next: &dyn State) {
let _ = next
.as_any()
.downcast_ref::<CloseDowngradedRegion>()
.unwrap();
}
/// Asserts the [State] should be [RegionMigrationAbort].
pub(crate) fn assert_region_migration_abort(next: &dyn State) {
let _ = next

View File

@@ -22,10 +22,10 @@ use common_procedure::Status;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use crate::error::Result;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::{Context, State};
#[derive(Debug, Serialize, Deserialize)]
@@ -58,7 +58,7 @@ impl State for UpdateMetadata {
if let Err(err) = ctx.invalidate_table_cache().await {
warn!("Failed to broadcast the invalidate table cache message during the upgrade candidate, error: {err:?}");
};
Ok((Box::new(RegionMigrationEnd), Status::done()))
Ok((Box::new(CloseDowngradedRegion), Status::executing(false)))
}
UpdateMetadata::Rollback => {
self.rollback_downgraded_region(ctx).await?;

View File

@@ -195,7 +195,7 @@ mod tests {
use store_api::storage::RegionId;
use crate::error::Error;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
@@ -443,7 +443,7 @@ mod tests {
}
#[tokio::test]
async fn test_next_migration_end_state() {
async fn test_next_close_downgraded_region_state() {
let mut state = Box::new(UpdateMetadata::Upgrade);
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
@@ -471,7 +471,10 @@ mod tests {
let (next, _) = state.next(&mut ctx).await.unwrap();
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
let _ = next
.as_any()
.downcast_ref::<CloseDowngradedRegion>()
.unwrap();
let table_route = table_metadata_manager
.table_route_manager()

View File

@@ -23,9 +23,9 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::time::{sleep, Instant};
use super::update_metadata::UpdateMetadata;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
@@ -155,7 +155,7 @@ impl UpgradeCandidateRegion {
exists,
error::UnexpectedSnafu {
violated: format!(
"Expected region {} doesn't exist on datanode {:?}",
"Candidate region {} doesn't exist on datanode {:?}",
region_id, candidate
)
}

View File

@@ -229,6 +229,29 @@ async fn create_logical_table_and_insert_values(
Ok(())
}
async fn wait_for_migration(ctx: &FuzzContext, migration: &Migration, procedure_id: &str) {
info!("Waits for migration: {migration:?}");
let region_id = migration.region_id.as_u64();
wait_condition_fn(
Duration::from_secs(120),
|| {
let greptime = ctx.greptime.clone();
let procedure_id = procedure_id.to_string();
Box::pin(async move {
let output = procedure_state(&greptime, &procedure_id).await;
info!("Checking procedure: {procedure_id}, output: {output}");
(fetch_partition(&greptime, region_id).await.unwrap(), output)
})
},
|(partition, output)| {
info!("Region: {region_id}, datanode: {}", partition.datanode_id);
partition.datanode_id == migration.to_peer && output.contains("Done")
},
Duration::from_secs(1),
)
.await;
}
async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
let mut rng = ChaCha20Rng::seed_from_u64(input.seed);
// Creates a physical table.
@@ -297,28 +320,7 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
}
info!("Excepted new region distribution: {new_distribution:?}");
for (migration, procedure_id) in migrations.clone().into_iter().zip(procedure_ids) {
info!("Waits for migration: {migration:?}");
let region_id = migration.region_id.as_u64();
wait_condition_fn(
Duration::from_secs(120),
|| {
let greptime = ctx.greptime.clone();
let procedure_id = procedure_id.to_string();
Box::pin(async move {
{
let output = procedure_state(&greptime, &procedure_id).await;
info!("Checking procedure: {procedure_id}, output: {output}");
fetch_partition(&greptime, region_id).await.unwrap()
}
})
},
|partition| {
info!("Region: {region_id}, datanode: {}", partition.datanode_id);
partition.datanode_id == migration.to_peer
},
Duration::from_secs(1),
)
.await;
wait_for_migration(&ctx, &migration, &procedure_id).await;
}
// Validates value rows
@@ -388,29 +390,8 @@ async fn execute_migration(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
procedure_ids.push(procedure_id);
}
info!("Excepted new region distribution: {new_distribution:?}");
for (migration, procedure_id) in migrations.into_iter().zip(procedure_ids) {
info!("Waits for migration: {migration:?}");
let region_id = migration.region_id.as_u64();
wait_condition_fn(
Duration::from_secs(120),
|| {
let greptime = ctx.greptime.clone();
let procedure_id = procedure_id.to_string();
Box::pin(async move {
{
let output = procedure_state(&greptime, &procedure_id).await;
info!("Checking procedure: {procedure_id}, output: {output}");
fetch_partition(&greptime, region_id).await.unwrap()
}
})
},
|partition| {
info!("Region: {region_id}, datanode: {}", partition.datanode_id);
partition.datanode_id == migration.to_peer
},
Duration::from_secs(1),
)
.await;
for (migration, procedure_id) in migrations.clone().into_iter().zip(procedure_ids) {
wait_for_migration(&ctx, &migration, &procedure_id).await;
}
// Creates more logical tables and inserts values

View File

@@ -248,13 +248,13 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<
{
let output = procedure_state(&greptime, &procedure_id).await;
info!("Checking procedure: {procedure_id}, output: {output}");
fetch_partition(&greptime, region_id).await.unwrap()
(fetch_partition(&greptime, region_id).await.unwrap(), output)
}
})
},
|partition| {
|(partition, output)| {
info!("Region: {region_id}, datanode: {}", partition.datanode_id);
partition.datanode_id == migration.to_peer
partition.datanode_id == migration.to_peer && output.contains("Done")
},
Duration::from_secs(5),
)