mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 14:22:58 +00:00
feat: add update metadata step for downgrading leader region (#2771)
This commit is contained in:
@@ -631,7 +631,7 @@ impl TableMetadataManager {
|
||||
pub async fn update_leader_region_status<F>(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
current_table_route_value: DeserializedValueWithBytes<TableRouteValue>,
|
||||
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
|
||||
next_region_route_status: F,
|
||||
) -> Result<()>
|
||||
where
|
||||
@@ -658,7 +658,7 @@ impl TableMetadataManager {
|
||||
|
||||
let (update_table_route_txn, on_update_table_route_failure) = self
|
||||
.table_route_manager()
|
||||
.build_update_txn(table_id, ¤t_table_route_value, &new_table_route_value)?;
|
||||
.build_update_txn(table_id, current_table_route_value, &new_table_route_value)?;
|
||||
|
||||
let r = self.kv_backend.txn(update_table_route_txn).await?;
|
||||
|
||||
@@ -1094,7 +1094,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
table_metadata_manager
|
||||
.update_leader_region_status(table_id, current_table_route_value, |region_route| {
|
||||
.update_leader_region_status(table_id, ¤t_table_route_value, |region_route| {
|
||||
if region_route.leader_status.is_some() {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -58,7 +58,7 @@ impl DeactivateRegion {
|
||||
.context(error::TableRouteNotFoundSnafu { table_id })?;
|
||||
|
||||
ctx.table_metadata_manager
|
||||
.update_leader_region_status(table_id, table_route_value, |region| {
|
||||
.update_leader_region_status(table_id, &table_route_value, |region| {
|
||||
if region.region.id.region_number() == failed_region.region_number {
|
||||
Some(Some(RegionStatus::Downgraded))
|
||||
} else {
|
||||
|
||||
@@ -18,11 +18,13 @@ pub(crate) mod migration_start;
|
||||
pub(crate) mod open_candidate_region;
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test_util;
|
||||
pub(crate) mod update_metadata;
|
||||
|
||||
use std::any::Any;
|
||||
use std::fmt::Debug;
|
||||
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::ClusterId;
|
||||
use common_procedure::error::{
|
||||
@@ -30,11 +32,11 @@ use common_procedure::error::{
|
||||
};
|
||||
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use snafu::{location, Location, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use self::migration_start::RegionMigrationStart;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::procedure::utils::region_lock_key;
|
||||
use crate::region::lease_keeper::{OpeningRegionGuard, OpeningRegionKeeperRef};
|
||||
use crate::service::mailbox::MailboxRef;
|
||||
@@ -74,8 +76,10 @@ pub struct VolatileContext {
|
||||
///
|
||||
/// `opening_region_guard` should be consumed after
|
||||
/// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
|
||||
/// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue) .
|
||||
/// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
|
||||
opening_region_guard: Option<OpeningRegionGuard>,
|
||||
/// `table_route_info` is stored via previous steps for future use.
|
||||
table_route_info: Option<DeserializedValueWithBytes<TableRouteValue>>,
|
||||
}
|
||||
|
||||
/// Used to generate new [Context].
|
||||
@@ -122,6 +126,47 @@ impl Context {
|
||||
pub fn server_addr(&self) -> &str {
|
||||
&self.server_addr
|
||||
}
|
||||
|
||||
/// Returns the `table_route_value` of [VolatileContext] if any.
|
||||
/// Otherwise, returns the value retrieved from remote.
|
||||
///
|
||||
/// Retry:
|
||||
/// - Failed to retrieve the metadata of table.
|
||||
pub async fn get_table_route_value(
|
||||
&mut self,
|
||||
) -> Result<&DeserializedValueWithBytes<TableRouteValue>> {
|
||||
let table_route_value = &mut self.volatile_ctx.table_route_info;
|
||||
|
||||
if table_route_value.is_none() {
|
||||
let table_id = self.persistent_ctx.region_id.table_id();
|
||||
let table_route = self
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
.map_err(|e| error::Error::RetryLater {
|
||||
reason: e.to_string(),
|
||||
location: location!(),
|
||||
})?
|
||||
.context(error::TableRouteNotFoundSnafu { table_id })?;
|
||||
|
||||
*table_route_value = Some(table_route);
|
||||
}
|
||||
|
||||
Ok(table_route_value.as_ref().unwrap())
|
||||
}
|
||||
|
||||
/// Removes the `table_route_value` of [VolatileContext], returns true if any.
|
||||
pub fn remove_table_route_value(&mut self) -> bool {
|
||||
let value = self.volatile_ctx.table_route_info.take();
|
||||
value.is_some()
|
||||
}
|
||||
|
||||
/// Returns the [RegionId].
|
||||
pub fn region_id(&self) -> RegionId {
|
||||
self.persistent_ctx.region_id
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::any::Any;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{location, Location, OptionExt, ResultExt};
|
||||
use snafu::OptionExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::downgrade_leader_region::DowngradeLeaderRegion;
|
||||
@@ -41,9 +41,8 @@ impl State for RegionMigrationStart {
|
||||
/// Otherwise go to the OpenCandidateRegion state.
|
||||
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
|
||||
let region_id = ctx.persistent_ctx.region_id;
|
||||
let to_peer = &ctx.persistent_ctx.to_peer;
|
||||
|
||||
let region_route = self.retrieve_region_route(ctx, region_id).await?;
|
||||
let to_peer = &ctx.persistent_ctx.to_peer;
|
||||
|
||||
if self.check_leader_region_on_peer(®ion_route, to_peer)? {
|
||||
Ok(Box::new(RegionMigrationEnd))
|
||||
@@ -70,21 +69,11 @@ impl RegionMigrationStart {
|
||||
/// - Failed to retrieve the metadata of table.
|
||||
async fn retrieve_region_route(
|
||||
&self,
|
||||
ctx: &Context,
|
||||
ctx: &mut Context,
|
||||
region_id: RegionId,
|
||||
) -> Result<RegionRoute> {
|
||||
let table_id = region_id.table_id();
|
||||
let table_route = ctx
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
.map_err(|e| error::Error::RetryLater {
|
||||
reason: e.to_string(),
|
||||
location: location!(),
|
||||
})?
|
||||
.context(error::TableRouteNotFoundSnafu { table_id })?;
|
||||
let table_route = ctx.get_table_route_value().await?;
|
||||
|
||||
let region_route = table_route
|
||||
.region_routes
|
||||
@@ -165,10 +154,10 @@ mod tests {
|
||||
let state = RegionMigrationStart;
|
||||
let env = TestingEnv::new();
|
||||
let persistent_context = new_persistent_context();
|
||||
let ctx = env.context_factory().new_context(persistent_context);
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let err = state
|
||||
.retrieve_region_route(&ctx, RegionId::new(1024, 1))
|
||||
.retrieve_region_route(&mut ctx, RegionId::new(1024, 1))
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
@@ -184,7 +173,7 @@ mod tests {
|
||||
let from_peer = persistent_context.from_peer.clone();
|
||||
|
||||
let env = TestingEnv::new();
|
||||
let ctx = env.context_factory().new_context(persistent_context);
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let region_route = RegionRoute {
|
||||
@@ -199,7 +188,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let err = state
|
||||
.retrieve_region_route(&ctx, RegionId::new(1024, 3))
|
||||
.retrieve_region_route(&mut ctx, RegionId::new(1024, 3))
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
|
||||
239
src/meta-srv/src/procedure/region_migration/update_metadata.rs
Normal file
239
src/meta-srv/src/procedure/region_migration/update_metadata.rs
Normal file
@@ -0,0 +1,239 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_meta::rpc::router::RegionStatus;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
|
||||
use crate::procedure::region_migration::{Context, State};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(tag = "UpdateMetadata")]
|
||||
pub enum UpdateMetadata {
|
||||
Downgrade,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[typetag::serde]
|
||||
impl State for UpdateMetadata {
|
||||
async fn next(&mut self, ctx: &mut Context) -> Result<Box<dyn State>> {
|
||||
match self {
|
||||
UpdateMetadata::Downgrade => {
|
||||
self.downgrade_leader_region(ctx).await?;
|
||||
|
||||
Ok(Box::new(DowngradeLeaderRegion))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl UpdateMetadata {
|
||||
/// Downgrades the leader region.
|
||||
///
|
||||
/// Abort(non-retry):
|
||||
/// - TableRoute is not found.
|
||||
///
|
||||
/// Retry:
|
||||
/// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).
|
||||
/// - Failed to retrieve the metadata of table.
|
||||
///
|
||||
/// About the failure of updating the [TableRouteValue](common_meta::key::table_region::TableRegionValue):
|
||||
///
|
||||
/// - There may be another [RegionMigrationProcedure](crate::procedure::region_migration::RegionMigrationProcedure)
|
||||
/// that is executed concurrently for **other region**.
|
||||
/// It will only update **other region** info. Therefore, It's safe to retry after failure.
|
||||
///
|
||||
/// - There is no other DDL procedure executed concurrently for the current table.
|
||||
async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> {
|
||||
let table_metadata_manager = ctx.table_metadata_manager.clone();
|
||||
let region_id = ctx.region_id();
|
||||
let table_id = region_id.table_id();
|
||||
let current_table_route_value = ctx.get_table_route_value().await?;
|
||||
|
||||
if let Err(err) = table_metadata_manager
|
||||
.update_leader_region_status(table_id, current_table_route_value, |route| {
|
||||
if route.region.id == region_id {
|
||||
Some(Some(RegionStatus::Downgraded))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
{
|
||||
debug_assert!(ctx.remove_table_route_value());
|
||||
return error::RetryLaterSnafu {
|
||||
reason: format!("Failed to update the table route during the downgrading leader region, error: {err}")
|
||||
}.fail();
|
||||
}
|
||||
|
||||
debug_assert!(ctx.remove_table_route_value());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use common_meta::key::test_utils::new_test_table_info;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::{Region, RegionRoute};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::procedure::region_migration::test_util::TestingEnv;
|
||||
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
|
||||
|
||||
fn new_persistent_context() -> PersistentContext {
|
||||
PersistentContext {
|
||||
from_peer: Peer::empty(1),
|
||||
to_peer: Peer::empty(2),
|
||||
region_id: RegionId::new(1024, 1),
|
||||
cluster_id: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_state_serialization() {
|
||||
let state = UpdateMetadata::Downgrade;
|
||||
let expected = r#"{"UpdateMetadata":"Downgrade"}"#;
|
||||
assert_eq!(expected, serde_json::to_string(&state).unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_route_is_not_found_error() {
|
||||
let state = UpdateMetadata::Downgrade;
|
||||
let env = TestingEnv::new();
|
||||
let persistent_context = new_persistent_context();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
|
||||
let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
|
||||
|
||||
assert_matches!(err, Error::TableRouteNotFound { .. });
|
||||
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_failed_to_update_table_route_error() {
|
||||
let state = UpdateMetadata::Downgrade;
|
||||
let persistent_context = new_persistent_context();
|
||||
let from_peer = persistent_context.from_peer.clone();
|
||||
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let table_id = ctx.region_id().table_id();
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1, 2]).into();
|
||||
let region_routes = vec![
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(from_peer.clone()),
|
||||
..Default::default()
|
||||
},
|
||||
RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 2)),
|
||||
leader_peer: Some(Peer::empty(4)),
|
||||
..Default::default()
|
||||
},
|
||||
];
|
||||
|
||||
let table_metadata_manager = env.table_metadata_manager();
|
||||
table_metadata_manager
|
||||
.create_table_metadata(table_info, region_routes)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let original_table_route = table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
// modifies the table route.
|
||||
table_metadata_manager
|
||||
.update_leader_region_status(table_id, &original_table_route, |route| {
|
||||
if route.region.id == RegionId::new(1024, 2) {
|
||||
Some(Some(RegionStatus::Downgraded))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// sets the old table route.
|
||||
ctx.volatile_ctx.table_route_info = Some(original_table_route);
|
||||
|
||||
let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
|
||||
|
||||
assert_matches!(err, Error::RetryLater { .. });
|
||||
|
||||
assert!(err.is_retryable());
|
||||
assert!(err.to_string().contains("Failed to update the table route"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_next_downgrade_leader_region_state() {
|
||||
let mut state = Box::new(UpdateMetadata::Downgrade);
|
||||
let persistent_context = new_persistent_context();
|
||||
let from_peer = persistent_context.from_peer.clone();
|
||||
|
||||
let env = TestingEnv::new();
|
||||
let mut ctx = env.context_factory().new_context(persistent_context);
|
||||
let table_id = ctx.region_id().table_id();
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1, 2]).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(RegionId::new(1024, 1)),
|
||||
leader_peer: Some(from_peer.clone()),
|
||||
..Default::default()
|
||||
}];
|
||||
|
||||
let table_metadata_manager = env.table_metadata_manager();
|
||||
table_metadata_manager
|
||||
.create_table_metadata(table_info, region_routes)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let next = state.next(&mut ctx).await.unwrap();
|
||||
|
||||
let _ = next
|
||||
.as_any()
|
||||
.downcast_ref::<DowngradeLeaderRegion>()
|
||||
.unwrap();
|
||||
|
||||
let latest_table_route = table_metadata_manager
|
||||
.table_route_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert!(latest_table_route.region_routes[0].is_leader_downgraded());
|
||||
assert!(ctx.volatile_ctx.table_route_info.is_none());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user