mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
refactor: remove useless region follower legacy code (#5795)
This commit is contained in:
@@ -1,229 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod manager;
|
||||
|
||||
pub mod add_region_follower;
|
||||
mod create;
|
||||
mod remove;
|
||||
pub mod remove_region_follower;
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
|
||||
use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
|
||||
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
||||
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::{distributed_time_constants, DatanodeId};
|
||||
use common_procedure::StringKey;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use strum::AsRefStr;
|
||||
|
||||
use crate::cluster::MetaPeerClientRef;
|
||||
use crate::error::{self, Result};
|
||||
use crate::lease::lookup_datanode_peer;
|
||||
use crate::service::mailbox::MailboxRef;
|
||||
|
||||
#[derive(Clone)]
|
||||
/// The context of add/remove region follower procedure.
|
||||
pub struct Context {
|
||||
/// The table metadata manager.
|
||||
pub table_metadata_manager: TableMetadataManagerRef,
|
||||
/// The mailbox.
|
||||
pub mailbox: MailboxRef,
|
||||
/// The metasrv's address.
|
||||
pub server_addr: String,
|
||||
/// The cache invalidator.
|
||||
pub cache_invalidator: CacheInvalidatorRef,
|
||||
/// The meta peer client.
|
||||
pub meta_peer_client: MetaPeerClientRef,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AlterRegionFollowerData {
|
||||
/// The catalog name.
|
||||
pub(crate) catalog: String,
|
||||
/// The schema name.
|
||||
pub(crate) schema: String,
|
||||
/// The region id.
|
||||
pub(crate) region_id: RegionId,
|
||||
/// The peer id of the datanode to add region follower.
|
||||
pub(crate) peer_id: u64,
|
||||
/// The peer of the datanode to add region follower.
|
||||
pub(crate) peer: Option<Peer>,
|
||||
/// The datanode table value of the region.
|
||||
pub(crate) datanode_table_value: Option<DatanodeTableValue>,
|
||||
/// The physical table route of the region.
|
||||
pub(crate) table_route: Option<(
|
||||
DeserializedValueWithBytes<TableRouteValue>,
|
||||
PhysicalTableRouteValue,
|
||||
)>,
|
||||
/// The state.
|
||||
pub(crate) state: AlterRegionFollowerState,
|
||||
}
|
||||
|
||||
impl AlterRegionFollowerData {
|
||||
pub fn lock_key(&self) -> Vec<StringKey> {
|
||||
let region_id = self.region_id;
|
||||
let lock_key = vec![
|
||||
CatalogLock::Read(&self.catalog).into(),
|
||||
SchemaLock::read(&self.catalog, &self.schema).into(),
|
||||
// The optimistic updating of table route is not working very well,
|
||||
// so we need to use the write lock here.
|
||||
TableLock::Write(region_id.table_id()).into(),
|
||||
RegionLock::Write(region_id).into(),
|
||||
];
|
||||
|
||||
lock_key
|
||||
}
|
||||
|
||||
pub(crate) fn datanode_peer(&self) -> Option<&Peer> {
|
||||
self.peer.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn physical_table_route(&self) -> Option<&PhysicalTableRouteValue> {
|
||||
self.table_route
|
||||
.as_ref()
|
||||
.map(|(_, table_route)| table_route)
|
||||
}
|
||||
|
||||
/// Returns the region info of the region.
|
||||
pub(crate) fn region_info(&self) -> Option<RegionInfo> {
|
||||
self.datanode_table_value
|
||||
.as_ref()
|
||||
.map(|datanode_table_value| datanode_table_value.region_info.clone())
|
||||
}
|
||||
|
||||
/// Loads the datanode peer.
|
||||
pub(crate) async fn load_datanode_peer(&self, ctx: &Context) -> Result<Option<Peer>> {
|
||||
let peer = lookup_datanode_peer(
|
||||
self.peer_id,
|
||||
&ctx.meta_peer_client,
|
||||
distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
)
|
||||
.await?
|
||||
.context(error::PeerUnavailableSnafu {
|
||||
peer_id: self.peer_id,
|
||||
})?;
|
||||
|
||||
Ok(Some(peer))
|
||||
}
|
||||
|
||||
/// Loads the datanode table value of the region.
|
||||
pub(crate) async fn load_datanode_table_value(
|
||||
&self,
|
||||
ctx: &Context,
|
||||
datanode_id: DatanodeId,
|
||||
) -> Result<Option<DatanodeTableValue>> {
|
||||
let table_id = self.region_id.table_id();
|
||||
let datanode_table_key = DatanodeTableKey {
|
||||
datanode_id,
|
||||
table_id,
|
||||
};
|
||||
|
||||
let datanode_table_value = ctx
|
||||
.table_metadata_manager
|
||||
.datanode_table_manager()
|
||||
.get(&datanode_table_key)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| error::RetryLaterWithSourceSnafu {
|
||||
reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"),
|
||||
})?
|
||||
.context(error::DatanodeTableNotFoundSnafu {
|
||||
table_id,
|
||||
datanode_id,
|
||||
})?;
|
||||
|
||||
Ok(Some(datanode_table_value))
|
||||
}
|
||||
|
||||
/// Loads the table route of the region, returns the physical table id.
|
||||
pub(crate) async fn load_table_route(
|
||||
&self,
|
||||
ctx: &Context,
|
||||
) -> Result<
|
||||
Option<(
|
||||
DeserializedValueWithBytes<TableRouteValue>,
|
||||
PhysicalTableRouteValue,
|
||||
)>,
|
||||
> {
|
||||
let table_id = self.region_id.table_id();
|
||||
let raw_table_route = ctx
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.get_with_raw_bytes(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| error::RetryLaterWithSourceSnafu {
|
||||
reason: format!("Failed to get TableRoute: {table_id}"),
|
||||
})?
|
||||
.context(error::TableRouteNotFoundSnafu { table_id })?;
|
||||
let table_route = raw_table_route.clone().into_inner();
|
||||
|
||||
ensure!(
|
||||
table_route.is_physical(),
|
||||
error::LogicalTableCannotAddFollowerSnafu { table_id }
|
||||
);
|
||||
|
||||
Ok(Some((
|
||||
raw_table_route,
|
||||
table_route.into_physical_table_route(),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
|
||||
pub enum AlterRegionFollowerState {
|
||||
/// Prepares to alter region follower.
|
||||
Prepare,
|
||||
/// Sends alter region follower request to Datanode.
|
||||
SubmitRequest,
|
||||
/// Updates table metadata.
|
||||
UpdateMetadata,
|
||||
/// Broadcasts the invalidate table route cache message.
|
||||
InvalidateTableCache,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_data_serialization() {
|
||||
let data = AlterRegionFollowerData {
|
||||
catalog: "test_catalog".to_string(),
|
||||
schema: "test_schema".to_string(),
|
||||
region_id: RegionId::new(1, 1),
|
||||
peer_id: 1,
|
||||
peer: None,
|
||||
datanode_table_value: None,
|
||||
table_route: None,
|
||||
state: AlterRegionFollowerState::Prepare,
|
||||
};
|
||||
|
||||
assert_eq!(data.region_id.as_u64(), 4294967297);
|
||||
let serialized = serde_json::to_string(&data).unwrap();
|
||||
let expected = r#"{"catalog":"test_catalog","schema":"test_schema","region_id":4294967297,"peer_id":1,"peer":null,"datanode_table_value":null,"table_route":null,"state":"Prepare"}"#;
|
||||
assert_eq!(expected, serialized);
|
||||
}
|
||||
}
|
||||
@@ -1,264 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use common_procedure::error::ToJsonSnafu;
|
||||
use common_procedure::{
|
||||
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
|
||||
Result as ProcedureResult, Status,
|
||||
};
|
||||
use common_telemetry::info;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::create::CreateFollower;
|
||||
use super::{AlterRegionFollowerData, AlterRegionFollowerState, Context};
|
||||
use crate::error::{self, Result};
|
||||
use crate::metrics;
|
||||
|
||||
/// The procedure to add a region follower.
|
||||
pub struct AddRegionFollowerProcedure {
|
||||
pub data: AlterRegionFollowerData,
|
||||
pub context: Context,
|
||||
}
|
||||
|
||||
impl AddRegionFollowerProcedure {
|
||||
pub const TYPE_NAME: &'static str = "metasrv-procedure::AddRegionFollower";
|
||||
|
||||
pub fn new(
|
||||
catalog: String,
|
||||
schema: String,
|
||||
region_id: RegionId,
|
||||
peer_id: u64,
|
||||
context: Context,
|
||||
) -> Self {
|
||||
Self {
|
||||
data: AlterRegionFollowerData {
|
||||
catalog,
|
||||
schema,
|
||||
region_id,
|
||||
peer_id,
|
||||
peer: None,
|
||||
datanode_table_value: None,
|
||||
table_route: None,
|
||||
state: AlterRegionFollowerState::Prepare,
|
||||
},
|
||||
context,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
|
||||
let data: AlterRegionFollowerData = serde_json::from_str(json).unwrap();
|
||||
Ok(Self { data, context })
|
||||
}
|
||||
|
||||
pub async fn on_prepare(&mut self) -> Result<Status> {
|
||||
// loads the datanode peer and check peer is alive
|
||||
self.data.peer = self.data.load_datanode_peer(&self.context).await?;
|
||||
|
||||
// loads the table route of the region
|
||||
self.data.table_route = self.data.load_table_route(&self.context).await?;
|
||||
let region_leader_datanode_id = {
|
||||
let table_route = self.data.physical_table_route().unwrap();
|
||||
table_route
|
||||
.region_routes
|
||||
.iter()
|
||||
.find(|region_route| region_route.region.id == self.data.region_id)
|
||||
.map(|region_route| region_route.leader_peer.as_ref().unwrap().id)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
// loads the datanode table value
|
||||
self.data.datanode_table_value = self
|
||||
.data
|
||||
.load_datanode_table_value(&self.context, region_leader_datanode_id)
|
||||
.await?;
|
||||
|
||||
let table_route = self.data.physical_table_route().unwrap();
|
||||
|
||||
let datanode_peer = self.data.datanode_peer().unwrap();
|
||||
|
||||
// check if the destination peer is already a leader/follower of the region
|
||||
for region_route in &table_route.region_routes {
|
||||
if region_route.region.id != self.data.region_id {
|
||||
continue;
|
||||
}
|
||||
let Some(leader_peer) = ®ion_route.leader_peer else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// check if the destination peer is already a leader of the region
|
||||
if leader_peer.id == datanode_peer.id {
|
||||
return error::RegionFollowerLeaderConflictSnafu {
|
||||
region_id: self.data.region_id,
|
||||
peer_id: datanode_peer.id,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
// check if the destination peer is already a follower of the region
|
||||
if region_route
|
||||
.follower_peers
|
||||
.iter()
|
||||
.any(|peer| peer.id == datanode_peer.id)
|
||||
{
|
||||
return error::MultipleRegionFollowersOnSameNodeSnafu {
|
||||
region_id: self.data.region_id,
|
||||
peer_id: datanode_peer.id,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Add region({}) follower procedure is preparing, peer: {datanode_peer:?}",
|
||||
self.data.region_id
|
||||
);
|
||||
self.data.state = AlterRegionFollowerState::SubmitRequest;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
pub async fn on_submit_request(&mut self) -> Result<Status> {
|
||||
let region_id = self.data.region_id;
|
||||
// Safety: we have already set the peer in `on_prepare``.
|
||||
let peer = self.data.peer.clone().unwrap();
|
||||
let create_follower = CreateFollower::new(region_id, peer);
|
||||
let instruction = create_follower
|
||||
.build_open_region_instruction(self.data.region_info().unwrap())
|
||||
.await?;
|
||||
create_follower
|
||||
.send_open_region_instruction(&self.context, instruction)
|
||||
.await?;
|
||||
self.data.state = AlterRegionFollowerState::UpdateMetadata;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
pub async fn on_update_metadata(&mut self) -> Result<Status> {
|
||||
// Safety: we have already load the table route in `on_prepare``.
|
||||
let (current_table_route_value, phy_table_route) = self.data.table_route.as_ref().unwrap();
|
||||
|
||||
let mut new_region_routes = phy_table_route.region_routes.clone();
|
||||
for region_route in &mut new_region_routes {
|
||||
if region_route.region.id != self.data.region_id {
|
||||
continue;
|
||||
}
|
||||
region_route
|
||||
.follower_peers
|
||||
.push(self.data.peer.clone().unwrap());
|
||||
}
|
||||
|
||||
// Safety: we have already load the region info in `on_prepare`.
|
||||
let region_info = self.data.region_info().unwrap();
|
||||
let new_region_options = region_info.region_options.clone();
|
||||
let new_region_wal_options = region_info.region_wal_options.clone();
|
||||
|
||||
self.context
|
||||
.table_metadata_manager
|
||||
.update_table_route(
|
||||
self.data.region_id.table_id(),
|
||||
region_info,
|
||||
current_table_route_value,
|
||||
new_region_routes,
|
||||
&new_region_options,
|
||||
&new_region_wal_options,
|
||||
)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?;
|
||||
self.data.state = AlterRegionFollowerState::InvalidateTableCache;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
pub async fn on_broadcast(&mut self) -> Result<Status> {
|
||||
let table_id = self.data.region_id.table_id();
|
||||
// ignore the result
|
||||
let ctx = common_meta::cache_invalidator::Context::default();
|
||||
let _ = self
|
||||
.context
|
||||
.cache_invalidator
|
||||
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
|
||||
.await;
|
||||
Ok(Status::done())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Procedure for AddRegionFollowerProcedure {
|
||||
fn type_name(&self) -> &str {
|
||||
Self::TYPE_NAME
|
||||
}
|
||||
|
||||
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
||||
let state = &self.data.state;
|
||||
|
||||
let _timer = metrics::METRIC_META_ADD_REGION_FOLLOWER_EXECUTE
|
||||
.with_label_values(&[state.as_ref()])
|
||||
.start_timer();
|
||||
|
||||
match state {
|
||||
AlterRegionFollowerState::Prepare => self.on_prepare().await,
|
||||
AlterRegionFollowerState::SubmitRequest => self.on_submit_request().await,
|
||||
AlterRegionFollowerState::UpdateMetadata => self.on_update_metadata().await,
|
||||
AlterRegionFollowerState::InvalidateTableCache => self.on_broadcast().await,
|
||||
}
|
||||
.map_err(|e| {
|
||||
if e.is_retryable() {
|
||||
ProcedureError::retry_later(e)
|
||||
} else {
|
||||
ProcedureError::external(e)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
serde_json::to_string(&self.data).context(ToJsonSnafu)
|
||||
}
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
LockKey::new(self.data.lock_key())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
|
||||
|
||||
use super::*;
|
||||
use crate::procedure::region_follower::test_util::TestingEnv;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lock_key() {
|
||||
let env = TestingEnv::new();
|
||||
let context = env.new_context();
|
||||
|
||||
let procedure = AddRegionFollowerProcedure::new(
|
||||
"test_catalog".to_string(),
|
||||
"test_schema".to_string(),
|
||||
RegionId::new(1, 1),
|
||||
1,
|
||||
context,
|
||||
);
|
||||
|
||||
let key = procedure.lock_key();
|
||||
let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(keys.len(), 4);
|
||||
assert!(keys.contains(&CatalogLock::Read("test_catalog").into()));
|
||||
assert!(keys.contains(&SchemaLock::read("test_catalog", "test_schema").into()));
|
||||
assert!(keys.contains(&TableLock::Write(1).into()));
|
||||
assert!(keys.contains(&RegionLock::Write(RegionId::new(1, 1)).into()));
|
||||
}
|
||||
}
|
||||
@@ -1,252 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use common_procedure::error::ToJsonSnafu;
|
||||
use common_procedure::{
|
||||
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
|
||||
Result as ProcedureResult, Status,
|
||||
};
|
||||
use common_telemetry::info;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::remove::RemoveFollower;
|
||||
use super::{AlterRegionFollowerData, AlterRegionFollowerState, Context};
|
||||
use crate::error::{self, Result};
|
||||
use crate::metrics;
|
||||
|
||||
/// The procedure to remove a region follower.
|
||||
pub struct RemoveRegionFollowerProcedure {
|
||||
pub data: AlterRegionFollowerData,
|
||||
pub context: Context,
|
||||
}
|
||||
|
||||
impl RemoveRegionFollowerProcedure {
|
||||
pub const TYPE_NAME: &'static str = "metasrv-procedure::RemoveRegionFollower";
|
||||
|
||||
pub fn new(
|
||||
catalog: String,
|
||||
schema: String,
|
||||
region_id: RegionId,
|
||||
peer_id: u64,
|
||||
context: Context,
|
||||
) -> Self {
|
||||
Self {
|
||||
data: AlterRegionFollowerData {
|
||||
catalog,
|
||||
schema,
|
||||
region_id,
|
||||
peer_id,
|
||||
peer: None,
|
||||
datanode_table_value: None,
|
||||
table_route: None,
|
||||
state: AlterRegionFollowerState::Prepare,
|
||||
},
|
||||
context,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
|
||||
let data: AlterRegionFollowerData = serde_json::from_str(json).unwrap();
|
||||
Ok(Self { data, context })
|
||||
}
|
||||
|
||||
pub async fn on_prepare(&mut self) -> Result<Status> {
|
||||
// loads the datanode peer and check peer is alive
|
||||
self.data.peer = self.data.load_datanode_peer(&self.context).await?;
|
||||
|
||||
// loads the table route of the region
|
||||
self.data.table_route = self.data.load_table_route(&self.context).await?;
|
||||
|
||||
// loads the table route of the region
|
||||
self.data.table_route = self.data.load_table_route(&self.context).await?;
|
||||
let region_leader_datanode_id = {
|
||||
let table_route = self.data.physical_table_route().unwrap();
|
||||
table_route
|
||||
.region_routes
|
||||
.iter()
|
||||
.find(|region_route| region_route.region.id == self.data.region_id)
|
||||
.map(|region_route| region_route.leader_peer.as_ref().unwrap().id)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
// loads the datanode table value
|
||||
self.data.datanode_table_value = self
|
||||
.data
|
||||
.load_datanode_table_value(&self.context, region_leader_datanode_id)
|
||||
.await?;
|
||||
|
||||
let table_route = self.data.physical_table_route().unwrap();
|
||||
|
||||
// check if the destination peer has this region
|
||||
for region_route in &table_route.region_routes {
|
||||
if region_route.region.id != self.data.region_id {
|
||||
continue;
|
||||
}
|
||||
ensure!(
|
||||
!region_route
|
||||
.follower_peers
|
||||
.iter()
|
||||
.any(|peer| peer.id == self.data.peer_id),
|
||||
error::RegionFollowerNotExistsSnafu {
|
||||
region_id: self.data.region_id,
|
||||
peer_id: self.data.peer_id,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
info!(
|
||||
"Remove region({}) follower procedure is preparing, peer: {:?}",
|
||||
self.data.region_id,
|
||||
self.data.datanode_peer()
|
||||
);
|
||||
self.data.state = AlterRegionFollowerState::SubmitRequest;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
pub async fn on_submit_request(&mut self) -> Result<Status> {
|
||||
let region_id = self.data.region_id;
|
||||
// Safety: we have already set the peer in `on_prepare``.
|
||||
let peer = self.data.peer.clone().unwrap();
|
||||
let remove_follower = RemoveFollower::new(region_id, peer);
|
||||
let instruction = remove_follower
|
||||
.build_close_region_instruction(self.data.region_info().unwrap())
|
||||
.await?;
|
||||
remove_follower
|
||||
.send_close_region_instruction(&self.context, instruction)
|
||||
.await?;
|
||||
self.data.state = AlterRegionFollowerState::UpdateMetadata;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
pub async fn on_update_metadata(&mut self) -> Result<Status> {
|
||||
// Safety: we have already load the table route in `on_prepare``.
|
||||
let (current_table_route_value, phy_table_route) = self.data.table_route.as_ref().unwrap();
|
||||
|
||||
let mut new_region_routes = phy_table_route.region_routes.clone();
|
||||
for region_route in &mut new_region_routes {
|
||||
if region_route.region.id != self.data.region_id {
|
||||
continue;
|
||||
}
|
||||
// remove the follower peer from the region route
|
||||
region_route
|
||||
.follower_peers
|
||||
.retain(|peer| peer.id != self.data.peer_id);
|
||||
}
|
||||
|
||||
// Safety: we have already load the region info in `on_prepare`.
|
||||
let region_info = self.data.region_info().unwrap();
|
||||
let new_region_options = region_info.region_options.clone();
|
||||
let new_region_wal_options = region_info.region_wal_options.clone();
|
||||
|
||||
self.context
|
||||
.table_metadata_manager
|
||||
.update_table_route(
|
||||
self.data.region_id.table_id(),
|
||||
region_info,
|
||||
current_table_route_value,
|
||||
new_region_routes,
|
||||
&new_region_options,
|
||||
&new_region_wal_options,
|
||||
)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?;
|
||||
self.data.state = AlterRegionFollowerState::InvalidateTableCache;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
pub async fn on_broadcast(&mut self) -> Result<Status> {
|
||||
let table_id = self.data.region_id.table_id();
|
||||
// ignore the result
|
||||
let ctx = common_meta::cache_invalidator::Context::default();
|
||||
let _ = self
|
||||
.context
|
||||
.cache_invalidator
|
||||
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
|
||||
.await;
|
||||
Ok(Status::done())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Procedure for RemoveRegionFollowerProcedure {
|
||||
fn type_name(&self) -> &str {
|
||||
Self::TYPE_NAME
|
||||
}
|
||||
|
||||
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
||||
let state = &self.data.state;
|
||||
|
||||
let _timer = metrics::METRIC_META_REMOVE_REGION_FOLLOWER_EXECUTE
|
||||
.with_label_values(&[state.as_ref()])
|
||||
.start_timer();
|
||||
|
||||
match state {
|
||||
AlterRegionFollowerState::Prepare => self.on_prepare().await,
|
||||
AlterRegionFollowerState::SubmitRequest => self.on_submit_request().await,
|
||||
AlterRegionFollowerState::UpdateMetadata => self.on_update_metadata().await,
|
||||
AlterRegionFollowerState::InvalidateTableCache => self.on_broadcast().await,
|
||||
}
|
||||
.map_err(|e| {
|
||||
if e.is_retryable() {
|
||||
ProcedureError::retry_later(e)
|
||||
} else {
|
||||
ProcedureError::external(e)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
serde_json::to_string(&self.data).context(ToJsonSnafu)
|
||||
}
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
LockKey::new(self.data.lock_key())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
|
||||
|
||||
use super::*;
|
||||
use crate::procedure::region_follower::test_util::TestingEnv;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lock_key() {
|
||||
let env = TestingEnv::new();
|
||||
let context = env.new_context();
|
||||
|
||||
let procedure = RemoveRegionFollowerProcedure::new(
|
||||
"test_catalog".to_string(),
|
||||
"test_schema".to_string(),
|
||||
RegionId::new(1, 1),
|
||||
1,
|
||||
context,
|
||||
);
|
||||
|
||||
let key = procedure.lock_key();
|
||||
let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(keys.len(), 4);
|
||||
assert!(keys.contains(&CatalogLock::Read("test_catalog").into()));
|
||||
assert!(keys.contains(&SchemaLock::read("test_catalog", "test_schema").into()));
|
||||
assert!(keys.contains(&TableLock::Write(1).into()));
|
||||
assert!(keys.contains(&RegionLock::Write(RegionId::new(1, 1)).into()));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user