mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
feat: alter region follower (#5676)
* feat: add region follower manager * feat: add region procudure * refactor: make add, remove follower procedure look nice * feat: add region follower procedure * chore: undo some chane, possibly made by AI * feat: on prepare cheking * feat: on update metadata * feat: on broadcast * chore: unit test * feat: add remove follower operation * feat: add or remove region follower procedure * chore: ut * chore: rename * chore: by comment * chore: by comment --------- Co-authored-by: jeremy <jeremy@greptime.local>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -6717,6 +6717,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"strum 0.25.0",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
|
||||
@@ -34,6 +34,24 @@ pub struct MigrateRegionRequest {
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
/// A request to add region follower.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AddRegionFollowerRequest {
|
||||
/// The region id to add follower.
|
||||
pub region_id: u64,
|
||||
/// The peer id to add follower.
|
||||
pub peer_id: u64,
|
||||
}
|
||||
|
||||
/// A request to remove region follower.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RemoveRegionFollowerRequest {
|
||||
/// The region id to remove follower.
|
||||
pub region_id: u64,
|
||||
/// The peer id to remove follower.
|
||||
pub peer_id: u64,
|
||||
}
|
||||
|
||||
/// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`].
|
||||
pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result<ProcedureId> {
|
||||
ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| {
|
||||
|
||||
@@ -61,6 +61,7 @@ serde_json.workspace = true
|
||||
servers.workspace = true
|
||||
snafu.workspace = true
|
||||
store-api.workspace = true
|
||||
strum.workspace = true
|
||||
table.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-postgres = { workspace = true, optional = true, features = ["with-chrono-0_4"] }
|
||||
|
||||
@@ -749,6 +749,41 @@ pub enum Error {
|
||||
location: Location,
|
||||
source: common_meta::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Logical table cannot add follower: {table_id}"))]
|
||||
LogicalTableCannotAddFollower {
|
||||
table_id: TableId,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"A region follower cannot be placed on the same node as the leader: {region_id}, {peer_id}"
|
||||
))]
|
||||
RegionFollowerLeaderConflict {
|
||||
region_id: RegionId,
|
||||
peer_id: u64,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Multiple region followers cannot be placed on the same node: {region_id}, {peer_id}"
|
||||
))]
|
||||
MultipleRegionFollowersOnSameNode {
|
||||
region_id: RegionId,
|
||||
peer_id: u64,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Region follower not exists: {region_id}, {peer_id}"))]
|
||||
RegionFollowerNotExists {
|
||||
region_id: RegionId,
|
||||
peer_id: u64,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl Error {
|
||||
@@ -818,6 +853,10 @@ impl ErrorExt for Error {
|
||||
| Error::ProcedureNotFound { .. }
|
||||
| Error::TooManyPartitions { .. }
|
||||
| Error::TomlFormat { .. }
|
||||
| Error::LogicalTableCannotAddFollower { .. }
|
||||
| Error::RegionFollowerLeaderConflict { .. }
|
||||
| Error::MultipleRegionFollowersOnSameNode { .. }
|
||||
| Error::RegionFollowerNotExists { .. }
|
||||
| Error::HandlerNotFound { .. } => StatusCode::InvalidArguments,
|
||||
Error::LeaseKeyFromUtf8 { .. }
|
||||
| Error::LeaseValueFromUtf8 { .. }
|
||||
|
||||
@@ -55,6 +55,8 @@ use crate::lease::MetaPeerLookupService;
|
||||
use crate::metasrv::{
|
||||
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ,
|
||||
};
|
||||
use crate::procedure::region_follower::manager::RegionFollowerManager;
|
||||
use crate::procedure::region_follower::Context as ArfContext;
|
||||
use crate::procedure::region_migration::manager::RegionMigrationManager;
|
||||
use crate::procedure::region_migration::DefaultContextFactory;
|
||||
use crate::region::supervisor::{
|
||||
@@ -292,6 +294,7 @@ impl MetasrvBuilder {
|
||||
(Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
|
||||
};
|
||||
|
||||
// region migration manager
|
||||
let region_migration_manager = Arc::new(RegionMigrationManager::new(
|
||||
procedure_manager.clone(),
|
||||
DefaultContextFactory::new(
|
||||
@@ -342,6 +345,19 @@ impl MetasrvBuilder {
|
||||
.context(error::InitDdlManagerSnafu)?,
|
||||
);
|
||||
|
||||
// alter region follower manager
|
||||
let region_follower_manager = Arc::new(RegionFollowerManager::new(
|
||||
procedure_manager.clone(),
|
||||
ArfContext {
|
||||
table_metadata_manager: table_metadata_manager.clone(),
|
||||
mailbox: mailbox.clone(),
|
||||
server_addr: options.server_addr.clone(),
|
||||
cache_invalidator: cache_invalidator.clone(),
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
},
|
||||
));
|
||||
region_follower_manager.try_start()?;
|
||||
|
||||
let handler_group_builder = match handler_group_builder {
|
||||
Some(handler_group_builder) => handler_group_builder,
|
||||
None => {
|
||||
|
||||
@@ -60,5 +60,10 @@ lazy_static! {
|
||||
/// The migration fail counter.
|
||||
pub static ref METRIC_META_REGION_MIGRATION_FAIL: IntCounter =
|
||||
register_int_counter!("greptime_meta_region_migration_fail", "meta region migration fail").unwrap();
|
||||
|
||||
/// The add region follower execute histogram.
|
||||
pub static ref METRIC_META_ADD_REGION_FOLLOWER_EXECUTE: HistogramVec =
|
||||
register_histogram_vec!("greptime_meta_add_region_follower_execute", "meta add region follower execute", &["state"]).unwrap();
|
||||
/// The remove region follower execute histogram.
|
||||
pub static ref METRIC_META_REMOVE_REGION_FOLLOWER_EXECUTE: HistogramVec =
|
||||
register_histogram_vec!("greptime_meta_remove_region_follower_execute", "meta remove region follower execute", &["state"]).unwrap();
|
||||
}
|
||||
|
||||
@@ -18,8 +18,11 @@ use common_meta::leadership_notifier::LeadershipChangeListener;
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
use snafu::ResultExt;
|
||||
|
||||
pub mod region_follower;
|
||||
pub mod region_migration;
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
pub mod utils;
|
||||
|
||||
|
||||
229
src/meta-srv/src/procedure/region_follower.rs
Normal file
229
src/meta-srv/src/procedure/region_follower.rs
Normal file
@@ -0,0 +1,229 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod manager;
|
||||
|
||||
pub mod add_region_follower;
|
||||
mod create;
|
||||
mod remove;
|
||||
pub mod remove_region_follower;
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::distributed_time_constants;
|
||||
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
|
||||
use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteValue};
|
||||
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
||||
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
|
||||
use common_meta::peer::Peer;
|
||||
use common_procedure::StringKey;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use strum::AsRefStr;
|
||||
|
||||
use crate::cluster::MetaPeerClientRef;
|
||||
use crate::error::{self, Result};
|
||||
use crate::lease::lookup_datanode_peer;
|
||||
use crate::service::mailbox::MailboxRef;
|
||||
|
||||
#[derive(Clone)]
|
||||
/// The context of add/remove region follower procedure.
|
||||
pub struct Context {
|
||||
/// The table metadata manager.
|
||||
pub table_metadata_manager: TableMetadataManagerRef,
|
||||
/// The mailbox.
|
||||
pub mailbox: MailboxRef,
|
||||
/// The metasrv's address.
|
||||
pub server_addr: String,
|
||||
/// The cache invalidator.
|
||||
pub cache_invalidator: CacheInvalidatorRef,
|
||||
/// The meta peer client.
|
||||
pub meta_peer_client: MetaPeerClientRef,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AlterRegionFollowerData {
|
||||
/// The catalog name.
|
||||
pub(crate) catalog: String,
|
||||
/// The schema name.
|
||||
pub(crate) schema: String,
|
||||
/// The region id.
|
||||
pub(crate) region_id: RegionId,
|
||||
/// The peer id of the datanode to add region follower.
|
||||
pub(crate) peer_id: u64,
|
||||
/// The peer of the datanode to add region follower.
|
||||
pub(crate) peer: Option<Peer>,
|
||||
/// The datanode table value of the region.
|
||||
pub(crate) datanode_table_value: Option<DatanodeTableValue>,
|
||||
/// The physical table route of the region.
|
||||
pub(crate) table_route: Option<(
|
||||
DeserializedValueWithBytes<TableRouteValue>,
|
||||
PhysicalTableRouteValue,
|
||||
)>,
|
||||
/// The state.
|
||||
pub(crate) state: AlterRegionFollowerState,
|
||||
}
|
||||
|
||||
impl AlterRegionFollowerData {
|
||||
pub fn lock_key(&self) -> Vec<StringKey> {
|
||||
let region_id = self.region_id;
|
||||
let lock_key = vec![
|
||||
CatalogLock::Read(&self.catalog).into(),
|
||||
SchemaLock::read(&self.catalog, &self.schema).into(),
|
||||
// The optimistic updating of table route is not working very well,
|
||||
// so we need to use the write lock here.
|
||||
TableLock::Write(region_id.table_id()).into(),
|
||||
RegionLock::Write(region_id).into(),
|
||||
];
|
||||
|
||||
lock_key
|
||||
}
|
||||
|
||||
pub(crate) fn datanode_peer(&self) -> Option<&Peer> {
|
||||
self.peer.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn physical_table_route(&self) -> Option<&PhysicalTableRouteValue> {
|
||||
self.table_route
|
||||
.as_ref()
|
||||
.map(|(_, table_route)| table_route)
|
||||
}
|
||||
|
||||
/// Returns the region info of the region.
|
||||
pub(crate) fn region_info(&self) -> Option<RegionInfo> {
|
||||
self.datanode_table_value
|
||||
.as_ref()
|
||||
.map(|datanode_table_value| datanode_table_value.region_info.clone())
|
||||
}
|
||||
|
||||
/// Loads the datanode peer.
|
||||
pub(crate) async fn load_datanode_peer(&self, ctx: &Context) -> Result<Option<Peer>> {
|
||||
let peer = lookup_datanode_peer(
|
||||
self.peer_id,
|
||||
&ctx.meta_peer_client,
|
||||
distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
)
|
||||
.await?
|
||||
.context(error::PeerUnavailableSnafu {
|
||||
peer_id: self.peer_id,
|
||||
})?;
|
||||
|
||||
Ok(Some(peer))
|
||||
}
|
||||
|
||||
/// Loads the datanode table value of the region.
|
||||
pub(crate) async fn load_datanode_table_value(
|
||||
&self,
|
||||
ctx: &Context,
|
||||
) -> Result<Option<DatanodeTableValue>> {
|
||||
let table_id = self.region_id.table_id();
|
||||
let datanode_id = self.peer_id;
|
||||
let datanode_table_key = DatanodeTableKey {
|
||||
datanode_id,
|
||||
table_id,
|
||||
};
|
||||
|
||||
let datanode_table_value = ctx
|
||||
.table_metadata_manager
|
||||
.datanode_table_manager()
|
||||
.get(&datanode_table_key)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| error::RetryLaterWithSourceSnafu {
|
||||
reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"),
|
||||
})?
|
||||
.context(error::DatanodeTableNotFoundSnafu {
|
||||
table_id,
|
||||
datanode_id,
|
||||
})?;
|
||||
|
||||
Ok(Some(datanode_table_value))
|
||||
}
|
||||
|
||||
/// Loads the table route of the region, returns the physical table id.
|
||||
pub(crate) async fn load_table_route(
|
||||
&self,
|
||||
ctx: &Context,
|
||||
) -> Result<
|
||||
Option<(
|
||||
DeserializedValueWithBytes<TableRouteValue>,
|
||||
PhysicalTableRouteValue,
|
||||
)>,
|
||||
> {
|
||||
let table_id = self.region_id.table_id();
|
||||
let raw_table_route = ctx
|
||||
.table_metadata_manager
|
||||
.table_route_manager()
|
||||
.table_route_storage()
|
||||
.get_with_raw_bytes(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| error::RetryLaterWithSourceSnafu {
|
||||
reason: format!("Failed to get TableRoute: {table_id}"),
|
||||
})?
|
||||
.context(error::TableRouteNotFoundSnafu { table_id })?;
|
||||
let table_route = raw_table_route.clone().into_inner();
|
||||
|
||||
ensure!(
|
||||
table_route.is_physical(),
|
||||
error::LogicalTableCannotAddFollowerSnafu { table_id }
|
||||
);
|
||||
|
||||
Ok(Some((
|
||||
raw_table_route,
|
||||
table_route.into_physical_table_route(),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
|
||||
pub enum AlterRegionFollowerState {
|
||||
/// Prepares to alter region follower.
|
||||
Prepare,
|
||||
/// Sends alter region follower request to Datanode.
|
||||
SubmitRequest,
|
||||
/// Updates table metadata.
|
||||
UpdateMetadata,
|
||||
/// Broadcasts the invalidate table route cache message.
|
||||
InvalidateTableCache,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_data_serialization() {
|
||||
let data = AlterRegionFollowerData {
|
||||
catalog: "test_catalog".to_string(),
|
||||
schema: "test_schema".to_string(),
|
||||
region_id: RegionId::new(1, 1),
|
||||
peer_id: 1,
|
||||
peer: None,
|
||||
datanode_table_value: None,
|
||||
table_route: None,
|
||||
state: AlterRegionFollowerState::Prepare,
|
||||
};
|
||||
|
||||
assert_eq!(data.region_id.as_u64(), 4294967297);
|
||||
let serialized = serde_json::to_string(&data).unwrap();
|
||||
let expected = r#"{"catalog":"test_catalog","schema":"test_schema","region_id":4294967297,"peer_id":1,"peer":null,"datanode_table_value":null,"table_route":null,"state":"Prepare"}"#;
|
||||
assert_eq!(expected, serialized);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,247 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use common_procedure::error::ToJsonSnafu;
|
||||
use common_procedure::{
|
||||
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
|
||||
Result as ProcedureResult, Status,
|
||||
};
|
||||
use common_telemetry::info;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::create::CreateFollower;
|
||||
use super::{AlterRegionFollowerData, AlterRegionFollowerState, Context};
|
||||
use crate::error::{self, Result};
|
||||
use crate::metrics;
|
||||
|
||||
/// The procedure to add a region follower.
|
||||
pub struct AddRegionFollowerProcedure {
|
||||
pub data: AlterRegionFollowerData,
|
||||
pub context: Context,
|
||||
}
|
||||
|
||||
impl AddRegionFollowerProcedure {
|
||||
pub const TYPE_NAME: &'static str = "metasrv-procedure::AddRegionFollower";
|
||||
|
||||
pub fn new(
|
||||
catalog: String,
|
||||
schema: String,
|
||||
region_id: RegionId,
|
||||
peer_id: u64,
|
||||
context: Context,
|
||||
) -> Self {
|
||||
Self {
|
||||
data: AlterRegionFollowerData {
|
||||
catalog,
|
||||
schema,
|
||||
region_id,
|
||||
peer_id,
|
||||
peer: None,
|
||||
datanode_table_value: None,
|
||||
table_route: None,
|
||||
state: AlterRegionFollowerState::Prepare,
|
||||
},
|
||||
context,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
|
||||
let data: AlterRegionFollowerData = serde_json::from_str(json).unwrap();
|
||||
Ok(Self { data, context })
|
||||
}
|
||||
|
||||
pub async fn on_prepare(&mut self) -> Result<Status> {
|
||||
// loads the datanode peer and check peer is alive
|
||||
self.data.peer = self.data.load_datanode_peer(&self.context).await?;
|
||||
|
||||
// loads the datanode table value
|
||||
self.data.datanode_table_value = self.data.load_datanode_table_value(&self.context).await?;
|
||||
|
||||
// loads the table route of the region
|
||||
self.data.table_route = self.data.load_table_route(&self.context).await?;
|
||||
let table_route = self.data.physical_table_route().unwrap();
|
||||
let datanode_peer = self.data.datanode_peer().unwrap();
|
||||
|
||||
// check if the destination peer is already a leader/follower of the region
|
||||
for region_route in &table_route.region_routes {
|
||||
if region_route.region.id != self.data.region_id {
|
||||
continue;
|
||||
}
|
||||
let Some(leader_peer) = ®ion_route.leader_peer else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// check if the destination peer is already a leader of the region
|
||||
if leader_peer.id == datanode_peer.id {
|
||||
return error::RegionFollowerLeaderConflictSnafu {
|
||||
region_id: self.data.region_id,
|
||||
peer_id: datanode_peer.id,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
// check if the destination peer is already a follower of the region
|
||||
if region_route
|
||||
.follower_peers
|
||||
.iter()
|
||||
.any(|peer| peer.id == datanode_peer.id)
|
||||
{
|
||||
return error::MultipleRegionFollowersOnSameNodeSnafu {
|
||||
region_id: self.data.region_id,
|
||||
peer_id: datanode_peer.id,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Add region({}) follower procedure is preparing, peer: {datanode_peer:?}",
|
||||
self.data.region_id
|
||||
);
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
pub async fn on_submit_request(&mut self) -> Result<Status> {
|
||||
let region_id = self.data.region_id;
|
||||
// Safety: we have already set the peer in `on_prepare``.
|
||||
let peer = self.data.peer.clone().unwrap();
|
||||
let create_follower = CreateFollower::new(region_id, peer);
|
||||
let instruction = create_follower
|
||||
.build_open_region_instruction(self.data.region_info().unwrap())
|
||||
.await?;
|
||||
create_follower
|
||||
.send_open_region_instruction(&self.context, instruction)
|
||||
.await?;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
pub async fn on_update_metadata(&mut self) -> Result<Status> {
|
||||
// Safety: we have already load the table route in `on_prepare``.
|
||||
let (current_table_route_value, phy_table_route) = self.data.table_route.as_ref().unwrap();
|
||||
|
||||
let mut new_region_routes = phy_table_route.region_routes.clone();
|
||||
for region_route in &mut new_region_routes {
|
||||
if region_route.region.id != self.data.region_id {
|
||||
continue;
|
||||
}
|
||||
region_route
|
||||
.follower_peers
|
||||
.push(self.data.peer.clone().unwrap());
|
||||
}
|
||||
|
||||
// Safety: we have already load the region info in `on_prepare`.
|
||||
let region_info = self.data.region_info().unwrap();
|
||||
let new_region_options = region_info.region_options.clone();
|
||||
let new_region_wal_options = region_info.region_wal_options.clone();
|
||||
|
||||
self.context
|
||||
.table_metadata_manager
|
||||
.update_table_route(
|
||||
self.data.region_id.table_id(),
|
||||
region_info,
|
||||
current_table_route_value,
|
||||
new_region_routes,
|
||||
&new_region_options,
|
||||
&new_region_wal_options,
|
||||
)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
pub async fn on_broadcast(&mut self) -> Result<Status> {
|
||||
let table_id = self.data.region_id.table_id();
|
||||
// ignore the result
|
||||
let ctx = common_meta::cache_invalidator::Context::default();
|
||||
let _ = self
|
||||
.context
|
||||
.cache_invalidator
|
||||
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
|
||||
.await;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Procedure for AddRegionFollowerProcedure {
|
||||
fn type_name(&self) -> &str {
|
||||
Self::TYPE_NAME
|
||||
}
|
||||
|
||||
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
||||
let state = &self.data.state;
|
||||
|
||||
let _timer = metrics::METRIC_META_ADD_REGION_FOLLOWER_EXECUTE
|
||||
.with_label_values(&[state.as_ref()])
|
||||
.start_timer();
|
||||
|
||||
match state {
|
||||
AlterRegionFollowerState::Prepare => self.on_prepare().await,
|
||||
AlterRegionFollowerState::SubmitRequest => self.on_submit_request().await,
|
||||
AlterRegionFollowerState::UpdateMetadata => self.on_update_metadata().await,
|
||||
AlterRegionFollowerState::InvalidateTableCache => self.on_broadcast().await,
|
||||
}
|
||||
.map_err(|e| {
|
||||
if e.is_retryable() {
|
||||
ProcedureError::retry_later(e)
|
||||
} else {
|
||||
ProcedureError::external(e)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
serde_json::to_string(&self.data).context(ToJsonSnafu)
|
||||
}
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
LockKey::new(self.data.lock_key())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
|
||||
|
||||
use super::*;
|
||||
use crate::procedure::region_follower::test_util::TestingEnv;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lock_key() {
|
||||
let env = TestingEnv::new();
|
||||
let context = env.new_context();
|
||||
|
||||
let procedure = AddRegionFollowerProcedure::new(
|
||||
"test_catalog".to_string(),
|
||||
"test_schema".to_string(),
|
||||
RegionId::new(1, 1),
|
||||
1,
|
||||
context,
|
||||
);
|
||||
|
||||
let key = procedure.lock_key();
|
||||
let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(keys.len(), 4);
|
||||
assert!(keys.contains(&CatalogLock::Read("test_catalog").into()));
|
||||
assert!(keys.contains(&SchemaLock::read("test_catalog", "test_schema").into()));
|
||||
assert!(keys.contains(&TableLock::Write(1).into()));
|
||||
assert!(keys.contains(&RegionLock::Write(RegionId::new(1, 1)).into()));
|
||||
}
|
||||
}
|
||||
252
src/meta-srv/src/procedure/region_follower/create.rs
Normal file
252
src/meta-srv/src/procedure/region_follower/create.rs
Normal file
@@ -0,0 +1,252 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
|
||||
use common_meta::key::datanode_table::RegionInfo;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::RegionIdent;
|
||||
use common_telemetry::info;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::Context;
|
||||
use crate::error::{self, Result};
|
||||
use crate::handler::HeartbeatMailbox;
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
/// Uses lease time of a region as the timeout of opening a follower region.
|
||||
const OPEN_REGION_FOLLOWER_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
|
||||
|
||||
pub(crate) struct CreateFollower {
|
||||
region_id: RegionId,
|
||||
// The peer of the datanode to add region follower.
|
||||
peer: Peer,
|
||||
}
|
||||
|
||||
impl CreateFollower {
|
||||
pub fn new(region_id: RegionId, peer: Peer) -> Self {
|
||||
Self { region_id, peer }
|
||||
}
|
||||
|
||||
/// Builds the open region instruction for the region follower.
|
||||
pub(crate) async fn build_open_region_instruction(
|
||||
&self,
|
||||
region_info: RegionInfo,
|
||||
) -> Result<Instruction> {
|
||||
let datanode_id = self.peer.id;
|
||||
let table_id = self.region_id.table_id();
|
||||
let region_number = self.region_id.region_number();
|
||||
|
||||
let RegionInfo {
|
||||
region_storage_path,
|
||||
region_options,
|
||||
region_wal_options,
|
||||
engine,
|
||||
} = region_info;
|
||||
|
||||
let region_ident = RegionIdent {
|
||||
datanode_id,
|
||||
table_id,
|
||||
region_number,
|
||||
engine,
|
||||
};
|
||||
|
||||
let open_instruction = Instruction::OpenRegion(OpenRegion::new(
|
||||
region_ident,
|
||||
®ion_storage_path,
|
||||
region_options,
|
||||
region_wal_options,
|
||||
true,
|
||||
));
|
||||
|
||||
Ok(open_instruction)
|
||||
}
|
||||
|
||||
/// Sends the open region instruction to the datanode.
|
||||
pub(crate) async fn send_open_region_instruction(
|
||||
&self,
|
||||
ctx: &Context,
|
||||
instruction: Instruction,
|
||||
) -> Result<()> {
|
||||
// TODO(jeremy): register the opening_region_keeper
|
||||
let msg = MailboxMessage::json_message(
|
||||
&format!("Open a follower region: {}", self.region_id),
|
||||
&format!("Metasrv@{}", ctx.server_addr),
|
||||
&format!("Datanode-{}@{}", self.peer.id, self.peer.addr),
|
||||
common_time::util::current_time_millis(),
|
||||
&instruction,
|
||||
)
|
||||
.with_context(|_| error::SerializeToJsonSnafu {
|
||||
input: instruction.to_string(),
|
||||
})?;
|
||||
|
||||
let ch = Channel::Datanode(self.peer.id);
|
||||
let now = Instant::now();
|
||||
let receiver = ctx
|
||||
.mailbox
|
||||
.send(&ch, msg, OPEN_REGION_FOLLOWER_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
match receiver.await? {
|
||||
Ok(msg) => {
|
||||
let reply = HeartbeatMailbox::json_reply(&msg)?;
|
||||
info!(
|
||||
"Received open region follower reply: {:?}, region: {}, elapsed: {:?}",
|
||||
reply,
|
||||
self.region_id,
|
||||
now.elapsed()
|
||||
);
|
||||
let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
|
||||
return error::UnexpectedInstructionReplySnafu {
|
||||
mailbox_message: msg.to_string(),
|
||||
reason: "expect open region follower reply",
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
if result {
|
||||
Ok(())
|
||||
} else {
|
||||
error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
"Region {} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
|
||||
self.region_id,
|
||||
&self.peer,
|
||||
now.elapsed()
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
Err(error::Error::MailboxTimeout { .. }) => {
|
||||
let reason = format!(
|
||||
"Mailbox received timeout for open region follower {} on datanode {:?}, elapsed: {:?}",
|
||||
self.region_id,
|
||||
&self.peer,
|
||||
now.elapsed()
|
||||
);
|
||||
error::RetryLaterSnafu { reason }.fail()
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use common_meta::DatanodeId;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::procedure::region_follower::test_util::TestingEnv;
|
||||
use crate::procedure::test_util::{new_close_region_reply, send_mock_reply};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_datanode_is_unreachable() {
|
||||
let env = TestingEnv::new();
|
||||
let ctx = env.new_context();
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let peer = Peer::new(1, "127.0.0.1:8080");
|
||||
let create_follower = CreateFollower::new(region_id, peer.clone());
|
||||
let instruction = mock_open_region_instruction(peer.id, region_id);
|
||||
let err = create_follower
|
||||
.send_open_region_instruction(&ctx, instruction)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err, Error::PusherNotFound { .. });
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_unexpected_instruction_reply() {
|
||||
let mut env = TestingEnv::new();
|
||||
let ctx = env.new_context();
|
||||
let mailbox_ctx = env.mailbox_context_mut();
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let peer = Peer::new(1, "127.0.0.1:8080");
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx)
|
||||
.await;
|
||||
|
||||
// Sends an timeout error.
|
||||
send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
|
||||
|
||||
let create_follower = CreateFollower::new(region_id, peer.clone());
|
||||
let instruction = mock_open_region_instruction(peer.id, region_id);
|
||||
let err = create_follower
|
||||
.send_open_region_instruction(&ctx, instruction)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_instruction_exceeded_deadline() {
|
||||
let mut env = TestingEnv::new();
|
||||
let ctx = env.new_context();
|
||||
let mailbox_ctx = env.mailbox_context_mut();
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let peer = Peer::new(1, "127.0.0.1:8080");
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx)
|
||||
.await;
|
||||
|
||||
// Sends an timeout error.
|
||||
send_mock_reply(mailbox, rx, |id| {
|
||||
Err(error::MailboxTimeoutSnafu { id }.build())
|
||||
});
|
||||
|
||||
let create_follower = CreateFollower::new(region_id, peer.clone());
|
||||
let instruction = mock_open_region_instruction(peer.id, region_id);
|
||||
let err = create_follower
|
||||
.send_open_region_instruction(&ctx, instruction)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err, Error::RetryLater { .. });
|
||||
assert!(err.is_retryable());
|
||||
}
|
||||
|
||||
fn mock_open_region_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
|
||||
Instruction::OpenRegion(OpenRegion {
|
||||
region_ident: RegionIdent {
|
||||
datanode_id,
|
||||
table_id: region_id.table_id(),
|
||||
region_number: region_id.region_number(),
|
||||
engine: "mito2".to_string(),
|
||||
},
|
||||
region_storage_path: "/tmp".to_string(),
|
||||
region_options: Default::default(),
|
||||
region_wal_options: Default::default(),
|
||||
skip_wal_replay: true,
|
||||
})
|
||||
}
|
||||
}
|
||||
193
src/meta-srv/src/procedure/region_follower/manager.rs
Normal file
193
src/meta-srv/src/procedure/region_follower/manager.rs
Normal file
@@ -0,0 +1,193 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::rpc::procedure::{AddRegionFollowerRequest, RemoveRegionFollowerRequest};
|
||||
use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId};
|
||||
use common_telemetry::info;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use super::remove_region_follower::RemoveRegionFollowerProcedure;
|
||||
use crate::error::{self, Result};
|
||||
use crate::procedure::region_follower::add_region_follower::AddRegionFollowerProcedure;
|
||||
use crate::procedure::region_follower::Context;
|
||||
|
||||
pub struct RegionFollowerManager {
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
default_context: Context,
|
||||
}
|
||||
|
||||
impl RegionFollowerManager {
|
||||
pub fn new(procedure_manager: ProcedureManagerRef, default_context: Context) -> Self {
|
||||
Self {
|
||||
procedure_manager,
|
||||
default_context,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_context(&self) -> Context {
|
||||
self.default_context.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn try_start(&self) -> Result<()> {
|
||||
// register add region follower procedure
|
||||
let context = self.new_context();
|
||||
let type_name = AddRegionFollowerProcedure::TYPE_NAME;
|
||||
self.procedure_manager
|
||||
.register_loader(
|
||||
type_name,
|
||||
Box::new(move |json| {
|
||||
let context = context.clone();
|
||||
AddRegionFollowerProcedure::from_json(json, context).map(|p| Box::new(p) as _)
|
||||
}),
|
||||
)
|
||||
.context(error::RegisterProcedureLoaderSnafu { type_name })?;
|
||||
|
||||
// register remove region follower procedure
|
||||
let context = self.new_context();
|
||||
let type_name = RemoveRegionFollowerProcedure::TYPE_NAME;
|
||||
self.procedure_manager
|
||||
.register_loader(
|
||||
type_name,
|
||||
Box::new(move |json| {
|
||||
let context = context.clone();
|
||||
RemoveRegionFollowerProcedure::from_json(json, context)
|
||||
.map(|p| Box::new(p) as _)
|
||||
}),
|
||||
)
|
||||
.context(error::RegisterProcedureLoaderSnafu { type_name })?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn submit_add_follower_procedure(
|
||||
&self,
|
||||
req: AddRegionFollowerRequest,
|
||||
) -> Result<(ProcedureId, Option<Output>)> {
|
||||
let AddRegionFollowerRequest { region_id, peer_id } = req;
|
||||
let region_id = RegionId::from_u64(region_id);
|
||||
let table_id = region_id.table_id();
|
||||
let ctx = self.new_context();
|
||||
|
||||
// get the table info
|
||||
let table_info = ctx
|
||||
.table_metadata_manager
|
||||
.table_info_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?
|
||||
.context(error::TableInfoNotFoundSnafu { table_id })?
|
||||
.into_inner();
|
||||
|
||||
let TableName {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
..
|
||||
} = table_info.table_name();
|
||||
|
||||
let procedure =
|
||||
AddRegionFollowerProcedure::new(catalog_name, schema_name, region_id, peer_id, ctx);
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
let procedure_id = procedure_with_id.id;
|
||||
info!("Starting add region follower procedure {procedure_id} for {req:?}");
|
||||
let mut watcher = self
|
||||
.procedure_manager
|
||||
.submit(procedure_with_id)
|
||||
.await
|
||||
.context(error::SubmitProcedureSnafu)?;
|
||||
let output = watcher::wait(&mut watcher)
|
||||
.await
|
||||
.context(error::WaitProcedureSnafu)?;
|
||||
|
||||
Ok((procedure_id, output))
|
||||
}
|
||||
|
||||
pub async fn submit_remove_follower_procedure(
|
||||
&self,
|
||||
req: RemoveRegionFollowerRequest,
|
||||
) -> Result<(ProcedureId, Option<Output>)> {
|
||||
let RemoveRegionFollowerRequest { region_id, peer_id } = req;
|
||||
let region_id = RegionId::from_u64(region_id);
|
||||
let table_id = region_id.table_id();
|
||||
let ctx = self.new_context();
|
||||
|
||||
// get the table info
|
||||
let table_info = ctx
|
||||
.table_metadata_manager
|
||||
.table_info_manager()
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?
|
||||
.context(error::TableInfoNotFoundSnafu { table_id })?
|
||||
.into_inner();
|
||||
|
||||
let TableName {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
..
|
||||
} = table_info.table_name();
|
||||
|
||||
let procedure =
|
||||
RemoveRegionFollowerProcedure::new(catalog_name, schema_name, region_id, peer_id, ctx);
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
let procedure_id = procedure_with_id.id;
|
||||
info!("Starting remove region follower procedure {procedure_id} for {req:?}");
|
||||
let mut watcher = self
|
||||
.procedure_manager
|
||||
.submit(procedure_with_id)
|
||||
.await
|
||||
.context(error::SubmitProcedureSnafu)?;
|
||||
let output = watcher::wait(&mut watcher)
|
||||
.await
|
||||
.context(error::WaitProcedureSnafu)?;
|
||||
|
||||
Ok((procedure_id, output))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use super::*;
|
||||
use crate::procedure::region_follower::test_util::TestingEnv;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_submit_procedure_table_not_found() {
|
||||
let env = TestingEnv::new();
|
||||
let ctx = env.new_context();
|
||||
let region_follower_manager = RegionFollowerManager::new(env.procedure_manager(), ctx);
|
||||
let req = AddRegionFollowerRequest {
|
||||
region_id: 1,
|
||||
peer_id: 2,
|
||||
};
|
||||
let err = region_follower_manager
|
||||
.submit_add_follower_procedure(req)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err, error::Error::TableInfoNotFound { .. });
|
||||
|
||||
let req = RemoveRegionFollowerRequest {
|
||||
region_id: 1,
|
||||
peer_id: 2,
|
||||
};
|
||||
let err = region_follower_manager
|
||||
.submit_remove_follower_procedure(req)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err, error::Error::TableInfoNotFound { .. });
|
||||
}
|
||||
}
|
||||
234
src/meta-srv/src/procedure/region_follower/remove.rs
Normal file
234
src/meta-srv/src/procedure/region_follower/remove.rs
Normal file
@@ -0,0 +1,234 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
|
||||
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
|
||||
use common_meta::key::datanode_table::RegionInfo;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::RegionIdent;
|
||||
use common_telemetry::info;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::Context;
|
||||
use crate::error::{self, Result};
|
||||
use crate::handler::HeartbeatMailbox;
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
/// Uses lease time of a region as the timeout of closing a follower region.
|
||||
const CLOSE_REGION_FOLLOWER_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
|
||||
|
||||
pub(crate) struct RemoveFollower {
|
||||
region_id: RegionId,
|
||||
// The peer of the datanode to add region follower.
|
||||
peer: Peer,
|
||||
}
|
||||
|
||||
impl RemoveFollower {
|
||||
pub fn new(region_id: RegionId, peer: Peer) -> Self {
|
||||
Self { region_id, peer }
|
||||
}
|
||||
|
||||
/// Builds the close region instruction for the region follower.
|
||||
pub(crate) async fn build_close_region_instruction(
|
||||
&self,
|
||||
region_info: RegionInfo,
|
||||
) -> Result<Instruction> {
|
||||
let datanode_id = self.peer.id;
|
||||
let table_id = self.region_id.table_id();
|
||||
let region_number = self.region_id.region_number();
|
||||
|
||||
let RegionInfo { engine, .. } = region_info;
|
||||
|
||||
let region_ident = RegionIdent {
|
||||
datanode_id,
|
||||
table_id,
|
||||
region_number,
|
||||
engine,
|
||||
};
|
||||
|
||||
let close_instruction = Instruction::CloseRegion(region_ident);
|
||||
|
||||
Ok(close_instruction)
|
||||
}
|
||||
|
||||
/// Sends the close region instruction to the datanode.
|
||||
pub(crate) async fn send_close_region_instruction(
|
||||
&self,
|
||||
ctx: &Context,
|
||||
instruction: Instruction,
|
||||
) -> Result<()> {
|
||||
let msg = MailboxMessage::json_message(
|
||||
&format!("Close a follower region: {}", self.region_id),
|
||||
&format!("Metasrv@{}", ctx.server_addr),
|
||||
&format!("Datanode-{}@{}", self.peer.id, self.peer.addr),
|
||||
common_time::util::current_time_millis(),
|
||||
&instruction,
|
||||
)
|
||||
.with_context(|_| error::SerializeToJsonSnafu {
|
||||
input: instruction.to_string(),
|
||||
})?;
|
||||
|
||||
let ch = Channel::Datanode(self.peer.id);
|
||||
let now = Instant::now();
|
||||
let receiver = ctx
|
||||
.mailbox
|
||||
.send(&ch, msg, CLOSE_REGION_FOLLOWER_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
match receiver.await? {
|
||||
Ok(msg) => {
|
||||
let reply = HeartbeatMailbox::json_reply(&msg)?;
|
||||
info!(
|
||||
"Received close region follower reply: {:?}, region: {}, elapsed: {:?}",
|
||||
reply,
|
||||
self.region_id,
|
||||
now.elapsed()
|
||||
);
|
||||
let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else {
|
||||
return error::UnexpectedInstructionReplySnafu {
|
||||
mailbox_message: msg.to_string(),
|
||||
reason: "expect close region follower reply",
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
if result {
|
||||
Ok(())
|
||||
} else {
|
||||
error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
"Region {} is not closed by datanode {:?}, error: {error:?}, elapsed: {:?}",
|
||||
self.region_id,
|
||||
&self.peer,
|
||||
now.elapsed()
|
||||
),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
Err(error::Error::MailboxTimeout { .. }) => {
|
||||
let reason = format!(
|
||||
"Mailbox received timeout for close region follower {} on datanode {:?}, elapsed: {:?}",
|
||||
self.region_id,
|
||||
&self.peer,
|
||||
now.elapsed()
|
||||
);
|
||||
error::RetryLaterSnafu { reason }.fail()
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use common_meta::DatanodeId;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::procedure::region_follower::test_util::TestingEnv;
|
||||
use crate::procedure::test_util::{new_open_region_reply, send_mock_reply};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_datanode_is_unreachable() {
|
||||
let env = TestingEnv::new();
|
||||
let ctx = env.new_context();
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let peer = Peer::new(1, "127.0.0.1:8080");
|
||||
let remove_follower = RemoveFollower::new(region_id, peer.clone());
|
||||
let instruction = mock_close_region_instruction(peer.id, region_id);
|
||||
let err = remove_follower
|
||||
.send_close_region_instruction(&ctx, instruction)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err, Error::PusherNotFound { .. });
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_unexpected_instruction_reply() {
|
||||
let mut env = TestingEnv::new();
|
||||
let ctx = env.new_context();
|
||||
let mailbox_ctx = env.mailbox_context_mut();
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let peer = Peer::new(1, "127.0.0.1:8080");
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx)
|
||||
.await;
|
||||
|
||||
// Sends an timeout error.
|
||||
send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, false, None)));
|
||||
|
||||
let remove_follower = RemoveFollower::new(region_id, peer.clone());
|
||||
let instruction = mock_close_region_instruction(peer.id, region_id);
|
||||
let err = remove_follower
|
||||
.send_close_region_instruction(&ctx, instruction)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
|
||||
assert!(!err.is_retryable());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_instruction_exceeded_deadline() {
|
||||
let mut env = TestingEnv::new();
|
||||
let ctx = env.new_context();
|
||||
let mailbox_ctx = env.mailbox_context_mut();
|
||||
let mailbox = mailbox_ctx.mailbox().clone();
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let peer = Peer::new(1, "127.0.0.1:8080");
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
mailbox_ctx
|
||||
.insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx)
|
||||
.await;
|
||||
|
||||
// Sends an timeout error.
|
||||
send_mock_reply(mailbox, rx, |id| {
|
||||
Err(error::MailboxTimeoutSnafu { id }.build())
|
||||
});
|
||||
|
||||
let remove_follower = RemoveFollower::new(region_id, peer.clone());
|
||||
let instruction = mock_close_region_instruction(peer.id, region_id);
|
||||
let err = remove_follower
|
||||
.send_close_region_instruction(&ctx, instruction)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_matches!(err, Error::RetryLater { .. });
|
||||
assert!(err.is_retryable());
|
||||
}
|
||||
|
||||
fn mock_close_region_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
|
||||
Instruction::CloseRegion(RegionIdent {
|
||||
datanode_id,
|
||||
table_id: region_id.table_id(),
|
||||
region_number: region_id.region_number(),
|
||||
engine: "mito2".to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,233 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use common_procedure::error::ToJsonSnafu;
|
||||
use common_procedure::{
|
||||
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
|
||||
Result as ProcedureResult, Status,
|
||||
};
|
||||
use common_telemetry::info;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::remove::RemoveFollower;
|
||||
use super::{AlterRegionFollowerData, AlterRegionFollowerState, Context};
|
||||
use crate::error::{self, Result};
|
||||
use crate::metrics;
|
||||
|
||||
/// The procedure to remove a region follower.
|
||||
pub struct RemoveRegionFollowerProcedure {
|
||||
pub data: AlterRegionFollowerData,
|
||||
pub context: Context,
|
||||
}
|
||||
|
||||
impl RemoveRegionFollowerProcedure {
|
||||
pub const TYPE_NAME: &'static str = "metasrv-procedure::RemoveRegionFollower";
|
||||
|
||||
pub fn new(
|
||||
catalog: String,
|
||||
schema: String,
|
||||
region_id: RegionId,
|
||||
peer_id: u64,
|
||||
context: Context,
|
||||
) -> Self {
|
||||
Self {
|
||||
data: AlterRegionFollowerData {
|
||||
catalog,
|
||||
schema,
|
||||
region_id,
|
||||
peer_id,
|
||||
peer: None,
|
||||
datanode_table_value: None,
|
||||
table_route: None,
|
||||
state: AlterRegionFollowerState::Prepare,
|
||||
},
|
||||
context,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_json(json: &str, context: Context) -> ProcedureResult<Self> {
|
||||
let data: AlterRegionFollowerData = serde_json::from_str(json).unwrap();
|
||||
Ok(Self { data, context })
|
||||
}
|
||||
|
||||
pub async fn on_prepare(&mut self) -> Result<Status> {
|
||||
// loads the datanode peer and check peer is alive
|
||||
self.data.peer = self.data.load_datanode_peer(&self.context).await?;
|
||||
|
||||
// loads the datanode table value
|
||||
self.data.datanode_table_value = self.data.load_datanode_table_value(&self.context).await?;
|
||||
|
||||
// loads the table route of the region
|
||||
self.data.table_route = self.data.load_table_route(&self.context).await?;
|
||||
let table_route = self.data.physical_table_route().unwrap();
|
||||
|
||||
// check if the destination peer has this region
|
||||
for region_route in &table_route.region_routes {
|
||||
if region_route.region.id != self.data.region_id {
|
||||
continue;
|
||||
}
|
||||
ensure!(
|
||||
!region_route
|
||||
.follower_peers
|
||||
.iter()
|
||||
.any(|peer| peer.id == self.data.peer_id),
|
||||
error::RegionFollowerNotExistsSnafu {
|
||||
region_id: self.data.region_id,
|
||||
peer_id: self.data.peer_id,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
info!(
|
||||
"Remove region({}) follower procedure is preparing, peer: {:?}",
|
||||
self.data.region_id,
|
||||
self.data.datanode_peer()
|
||||
);
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
pub async fn on_submit_request(&mut self) -> Result<Status> {
|
||||
let region_id = self.data.region_id;
|
||||
// Safety: we have already set the peer in `on_prepare``.
|
||||
let peer = self.data.peer.clone().unwrap();
|
||||
let remove_follower = RemoveFollower::new(region_id, peer);
|
||||
let instruction = remove_follower
|
||||
.build_close_region_instruction(self.data.region_info().unwrap())
|
||||
.await?;
|
||||
remove_follower
|
||||
.send_close_region_instruction(&self.context, instruction)
|
||||
.await?;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
pub async fn on_update_metadata(&mut self) -> Result<Status> {
|
||||
// Safety: we have already load the table route in `on_prepare``.
|
||||
let (current_table_route_value, phy_table_route) = self.data.table_route.as_ref().unwrap();
|
||||
|
||||
let mut new_region_routes = phy_table_route.region_routes.clone();
|
||||
for region_route in &mut new_region_routes {
|
||||
if region_route.region.id != self.data.region_id {
|
||||
continue;
|
||||
}
|
||||
// remove the follower peer from the region route
|
||||
region_route
|
||||
.follower_peers
|
||||
.retain(|peer| peer.id != self.data.peer_id);
|
||||
}
|
||||
|
||||
// Safety: we have already load the region info in `on_prepare`.
|
||||
let region_info = self.data.region_info().unwrap();
|
||||
let new_region_options = region_info.region_options.clone();
|
||||
let new_region_wal_options = region_info.region_wal_options.clone();
|
||||
|
||||
self.context
|
||||
.table_metadata_manager
|
||||
.update_table_route(
|
||||
self.data.region_id.table_id(),
|
||||
region_info,
|
||||
current_table_route_value,
|
||||
new_region_routes,
|
||||
&new_region_options,
|
||||
&new_region_wal_options,
|
||||
)
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
pub async fn on_broadcast(&mut self) -> Result<Status> {
|
||||
let table_id = self.data.region_id.table_id();
|
||||
// ignore the result
|
||||
let ctx = common_meta::cache_invalidator::Context::default();
|
||||
let _ = self
|
||||
.context
|
||||
.cache_invalidator
|
||||
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
|
||||
.await;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Procedure for RemoveRegionFollowerProcedure {
|
||||
fn type_name(&self) -> &str {
|
||||
Self::TYPE_NAME
|
||||
}
|
||||
|
||||
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
||||
let state = &self.data.state;
|
||||
|
||||
let _timer = metrics::METRIC_META_REMOVE_REGION_FOLLOWER_EXECUTE
|
||||
.with_label_values(&[state.as_ref()])
|
||||
.start_timer();
|
||||
|
||||
match state {
|
||||
AlterRegionFollowerState::Prepare => self.on_prepare().await,
|
||||
AlterRegionFollowerState::SubmitRequest => self.on_submit_request().await,
|
||||
AlterRegionFollowerState::UpdateMetadata => self.on_update_metadata().await,
|
||||
AlterRegionFollowerState::InvalidateTableCache => self.on_broadcast().await,
|
||||
}
|
||||
.map_err(|e| {
|
||||
if e.is_retryable() {
|
||||
ProcedureError::retry_later(e)
|
||||
} else {
|
||||
ProcedureError::external(e)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
serde_json::to_string(&self.data).context(ToJsonSnafu)
|
||||
}
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
LockKey::new(self.data.lock_key())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
|
||||
|
||||
use super::*;
|
||||
use crate::procedure::region_follower::test_util::TestingEnv;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lock_key() {
|
||||
let env = TestingEnv::new();
|
||||
let context = env.new_context();
|
||||
|
||||
let procedure = RemoveRegionFollowerProcedure::new(
|
||||
"test_catalog".to_string(),
|
||||
"test_schema".to_string(),
|
||||
RegionId::new(1, 1),
|
||||
1,
|
||||
context,
|
||||
);
|
||||
|
||||
let key = procedure.lock_key();
|
||||
let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(keys.len(), 4);
|
||||
assert!(keys.contains(&CatalogLock::Read("test_catalog").into()));
|
||||
assert!(keys.contains(&SchemaLock::read("test_catalog", "test_schema").into()));
|
||||
assert!(keys.contains(&TableLock::Write(1).into()));
|
||||
assert!(keys.contains(&RegionLock::Write(RegionId::new(1, 1)).into()));
|
||||
}
|
||||
}
|
||||
97
src/meta-srv/src/procedure/region_follower/test_util.rs
Normal file
97
src/meta-srv/src/procedure/region_follower/test_util.rs
Normal file
@@ -0,0 +1,97 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::ResettableKvBackendRef;
|
||||
use common_meta::sequence::SequenceBuilder;
|
||||
use common_meta::state_store::KvStateStore;
|
||||
use common_procedure::local::{LocalManager, ManagerConfig};
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
|
||||
use super::Context;
|
||||
use crate::cache_invalidator::MetasrvCacheInvalidator;
|
||||
use crate::cluster::MetaPeerClientBuilder;
|
||||
use crate::metasrv::MetasrvInfo;
|
||||
use crate::procedure::test_util::MailboxContext;
|
||||
|
||||
/// `TestingEnv` provides components during the tests.
|
||||
pub struct TestingEnv {
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
mailbox_ctx: MailboxContext,
|
||||
server_addr: String,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
in_memory: ResettableKvBackendRef,
|
||||
}
|
||||
|
||||
impl Default for TestingEnv {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl TestingEnv {
|
||||
pub fn new() -> Self {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||
let in_memory = Arc::new(MemoryKvBackend::new());
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||
|
||||
let mailbox_sequence =
|
||||
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
|
||||
|
||||
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
|
||||
|
||||
let state_store = Arc::new(KvStateStore::new(kv_backend));
|
||||
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
|
||||
|
||||
Self {
|
||||
table_metadata_manager,
|
||||
mailbox_ctx,
|
||||
server_addr: "localhost".to_string(),
|
||||
procedure_manager,
|
||||
in_memory,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_context(&self) -> Context {
|
||||
Context {
|
||||
table_metadata_manager: self.table_metadata_manager.clone(),
|
||||
mailbox: self.mailbox_ctx.mailbox().clone(),
|
||||
server_addr: self.server_addr.clone(),
|
||||
cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
|
||||
self.mailbox_ctx.mailbox().clone(),
|
||||
MetasrvInfo {
|
||||
server_addr: self.server_addr.to_string(),
|
||||
},
|
||||
)),
|
||||
meta_peer_client: MetaPeerClientBuilder::default()
|
||||
.election(None)
|
||||
.in_memory(self.in_memory.clone())
|
||||
.build()
|
||||
.map(Arc::new)
|
||||
// Safety: all required fields set at initialization
|
||||
.unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mailbox_context_mut(&mut self) -> &mut MailboxContext {
|
||||
&mut self.mailbox_ctx
|
||||
}
|
||||
|
||||
pub fn procedure_manager(&self) -> ProcedureManagerRef {
|
||||
self.procedure_manager.clone()
|
||||
}
|
||||
}
|
||||
@@ -579,6 +579,9 @@ mod tests {
|
||||
use crate::handler::HeartbeatMailbox;
|
||||
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
|
||||
use crate::procedure::region_migration::test_util::*;
|
||||
use crate::procedure::test_util::{
|
||||
new_downgrade_region_reply, new_open_region_reply, new_upgrade_region_reply,
|
||||
};
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
fn new_persistent_context() -> PersistentContext {
|
||||
|
||||
@@ -84,7 +84,7 @@ impl CloseDowngradedRegion {
|
||||
let downgrade_leader_datanode = &pc.from_peer;
|
||||
let msg = MailboxMessage::json_message(
|
||||
&format!("Close downgraded region: {}", region_id),
|
||||
&format!("Meta@{}", ctx.server_addr()),
|
||||
&format!("Metasrv@{}", ctx.server_addr()),
|
||||
&format!(
|
||||
"Datanode-{}@{}",
|
||||
downgrade_leader_datanode.id, downgrade_leader_datanode.addr
|
||||
|
||||
@@ -150,7 +150,7 @@ impl DowngradeLeaderRegion {
|
||||
let leader = &ctx.persistent_ctx.from_peer;
|
||||
let msg = MailboxMessage::json_message(
|
||||
&format!("Downgrade leader region: {}", region_id),
|
||||
&format!("Meta@{}", ctx.server_addr()),
|
||||
&format!("Metasrv@{}", ctx.server_addr()),
|
||||
&format!("Datanode-{}@{}", leader.id, leader.addr),
|
||||
common_time::util::current_time_millis(),
|
||||
&downgrade_instruction,
|
||||
@@ -282,10 +282,11 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::procedure::region_migration::test_util::{
|
||||
new_close_region_reply, new_downgrade_region_reply, send_mock_reply, TestingEnv,
|
||||
};
|
||||
use crate::procedure::region_migration::test_util::TestingEnv;
|
||||
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
|
||||
use crate::procedure::test_util::{
|
||||
new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
|
||||
};
|
||||
|
||||
fn new_persistent_context() -> PersistentContext {
|
||||
PersistentContext {
|
||||
|
||||
@@ -126,7 +126,7 @@ impl OpenCandidateRegion {
|
||||
|
||||
let msg = MailboxMessage::json_message(
|
||||
&format!("Open candidate region: {}", region_id),
|
||||
&format!("Meta@{}", ctx.server_addr()),
|
||||
&format!("Metasrv@{}", ctx.server_addr()),
|
||||
&format!("Datanode-{}@{}", candidate.id, candidate.addr),
|
||||
common_time::util::current_time_millis(),
|
||||
&open_instruction,
|
||||
@@ -200,10 +200,11 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::procedure::region_migration::test_util::{
|
||||
self, new_close_region_reply, new_open_region_reply, send_mock_reply, TestingEnv,
|
||||
};
|
||||
use crate::procedure::region_migration::test_util::{self, TestingEnv};
|
||||
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
|
||||
use crate::procedure::test_util::{
|
||||
new_close_region_reply, new_open_region_reply, send_mock_reply,
|
||||
};
|
||||
|
||||
fn new_persistent_context() -> PersistentContext {
|
||||
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
|
||||
|
||||
@@ -18,12 +18,8 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::mailbox_message::Payload;
|
||||
use api::v1::meta::{HeartbeatResponse, MailboxMessage};
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_meta::ddl::NoopRegionFailureDetectorControl;
|
||||
use common_meta::instruction::{
|
||||
DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply,
|
||||
};
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
@@ -31,22 +27,19 @@ use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_meta::sequence::{Sequence, SequenceBuilder};
|
||||
use common_meta::sequence::SequenceBuilder;
|
||||
use common_meta::state_store::KvStateStore;
|
||||
use common_meta::DatanodeId;
|
||||
use common_procedure::local::{LocalManager, ManagerConfig};
|
||||
use common_procedure::{Context as ProcedureContext, ProcedureId, ProcedureManagerRef, Status};
|
||||
use common_procedure_test::MockContextProvider;
|
||||
use common_telemetry::debug;
|
||||
use common_time::util::current_time_millis;
|
||||
use futures::future::BoxFuture;
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::RawTableInfo;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
|
||||
use crate::cache_invalidator::MetasrvCacheInvalidator;
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
|
||||
use crate::metasrv::MetasrvInfo;
|
||||
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
|
||||
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
|
||||
@@ -59,40 +52,8 @@ use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandida
|
||||
use crate::procedure::region_migration::{
|
||||
Context, ContextFactory, DefaultContextFactory, PersistentContext, State, VolatileContext,
|
||||
};
|
||||
use crate::service::mailbox::{Channel, MailboxRef};
|
||||
|
||||
pub type MockHeartbeatReceiver = Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>;
|
||||
|
||||
/// The context of mailbox.
|
||||
pub struct MailboxContext {
|
||||
mailbox: MailboxRef,
|
||||
// The pusher is used in the mailbox.
|
||||
pushers: Pushers,
|
||||
}
|
||||
|
||||
impl MailboxContext {
|
||||
pub fn new(sequence: Sequence) -> Self {
|
||||
let pushers = Pushers::default();
|
||||
let mailbox = HeartbeatMailbox::create(pushers.clone(), sequence);
|
||||
|
||||
Self { mailbox, pushers }
|
||||
}
|
||||
|
||||
/// Inserts a pusher for `datanode_id`
|
||||
pub async fn insert_heartbeat_response_receiver(
|
||||
&mut self,
|
||||
channel: Channel,
|
||||
tx: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
|
||||
) {
|
||||
let pusher_id = channel.pusher_id();
|
||||
let pusher = Pusher::new(tx);
|
||||
let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
|
||||
}
|
||||
|
||||
pub fn mailbox(&self) -> &MailboxRef {
|
||||
&self.mailbox
|
||||
}
|
||||
}
|
||||
use crate::procedure::test_util::{send_mock_reply, MailboxContext};
|
||||
use crate::service::mailbox::Channel;
|
||||
|
||||
/// `TestingEnv` provides components during the tests.
|
||||
pub struct TestingEnv {
|
||||
@@ -157,7 +118,7 @@ impl TestingEnv {
|
||||
server_addr: self.server_addr.to_string(),
|
||||
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
|
||||
cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
|
||||
self.mailbox_ctx.mailbox.clone(),
|
||||
self.mailbox_ctx.mailbox().clone(),
|
||||
MetasrvInfo {
|
||||
server_addr: self.server_addr.to_string(),
|
||||
},
|
||||
@@ -210,105 +171,6 @@ impl TestingEnv {
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates a [InstructionReply::OpenRegion] reply.
|
||||
pub(crate) fn new_open_region_reply(
|
||||
id: u64,
|
||||
result: bool,
|
||||
error: Option<String>,
|
||||
) -> MailboxMessage {
|
||||
MailboxMessage {
|
||||
id,
|
||||
subject: "mock".to_string(),
|
||||
from: "datanode".to_string(),
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error }))
|
||||
.unwrap(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates a [InstructionReply::CloseRegion] reply.
|
||||
pub fn new_close_region_reply(id: u64) -> MailboxMessage {
|
||||
MailboxMessage {
|
||||
id,
|
||||
subject: "mock".to_string(),
|
||||
from: "datanode".to_string(),
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply {
|
||||
result: false,
|
||||
error: None,
|
||||
}))
|
||||
.unwrap(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates a [InstructionReply::DowngradeRegion] reply.
|
||||
pub fn new_downgrade_region_reply(
|
||||
id: u64,
|
||||
last_entry_id: Option<u64>,
|
||||
exist: bool,
|
||||
error: Option<String>,
|
||||
) -> MailboxMessage {
|
||||
MailboxMessage {
|
||||
id,
|
||||
subject: "mock".to_string(),
|
||||
from: "datanode".to_string(),
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id,
|
||||
exists: exist,
|
||||
error,
|
||||
}))
|
||||
.unwrap(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates a [InstructionReply::UpgradeRegion] reply.
|
||||
pub fn new_upgrade_region_reply(
|
||||
id: u64,
|
||||
ready: bool,
|
||||
exists: bool,
|
||||
error: Option<String>,
|
||||
) -> MailboxMessage {
|
||||
MailboxMessage {
|
||||
id,
|
||||
subject: "mock".to_string(),
|
||||
from: "datanode".to_string(),
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready,
|
||||
exists,
|
||||
error,
|
||||
}))
|
||||
.unwrap(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a mock reply.
|
||||
pub fn send_mock_reply(
|
||||
mailbox: MailboxRef,
|
||||
mut rx: MockHeartbeatReceiver,
|
||||
msg: impl Fn(u64) -> Result<MailboxMessage> + Send + 'static,
|
||||
) {
|
||||
common_runtime::spawn_global(async move {
|
||||
while let Some(Ok(resp)) = rx.recv().await {
|
||||
let reply_id = resp.mailbox_message.unwrap().id;
|
||||
mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Generates a [PersistentContext].
|
||||
pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> PersistentContext {
|
||||
PersistentContext {
|
||||
|
||||
@@ -112,7 +112,7 @@ impl UpgradeCandidateRegion {
|
||||
|
||||
let msg = MailboxMessage::json_message(
|
||||
&format!("Upgrade candidate region: {}", region_id),
|
||||
&format!("Meta@{}", ctx.server_addr()),
|
||||
&format!("Metasrv@{}", ctx.server_addr()),
|
||||
&format!("Datanode-{}@{}", candidate.id, candidate.addr),
|
||||
common_time::util::current_time_millis(),
|
||||
&upgrade_instruction,
|
||||
@@ -226,10 +226,11 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::procedure::region_migration::test_util::{
|
||||
new_close_region_reply, new_upgrade_region_reply, send_mock_reply, TestingEnv,
|
||||
};
|
||||
use crate::procedure::region_migration::test_util::TestingEnv;
|
||||
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
|
||||
use crate::procedure::test_util::{
|
||||
new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
|
||||
};
|
||||
|
||||
fn new_persistent_context() -> PersistentContext {
|
||||
PersistentContext {
|
||||
|
||||
158
src/meta-srv/src/procedure/test_util.rs
Normal file
158
src/meta-srv/src/procedure/test_util.rs
Normal file
@@ -0,0 +1,158 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::meta::mailbox_message::Payload;
|
||||
use api::v1::meta::{HeartbeatResponse, MailboxMessage};
|
||||
use common_meta::instruction::{
|
||||
DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply,
|
||||
};
|
||||
use common_meta::sequence::Sequence;
|
||||
use common_time::util::current_time_millis;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
|
||||
use crate::service::mailbox::{Channel, MailboxRef};
|
||||
|
||||
pub type MockHeartbeatReceiver = Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>;
|
||||
|
||||
/// The context of mailbox.
|
||||
pub struct MailboxContext {
|
||||
mailbox: MailboxRef,
|
||||
// The pusher is used in the mailbox.
|
||||
pushers: Pushers,
|
||||
}
|
||||
|
||||
impl MailboxContext {
|
||||
pub fn new(sequence: Sequence) -> Self {
|
||||
let pushers = Pushers::default();
|
||||
let mailbox = HeartbeatMailbox::create(pushers.clone(), sequence);
|
||||
|
||||
Self { mailbox, pushers }
|
||||
}
|
||||
|
||||
/// Inserts a pusher for `datanode_id`
|
||||
pub async fn insert_heartbeat_response_receiver(
|
||||
&mut self,
|
||||
channel: Channel,
|
||||
tx: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
|
||||
) {
|
||||
let pusher_id = channel.pusher_id();
|
||||
let pusher = Pusher::new(tx);
|
||||
let _ = self.pushers.insert(pusher_id.string_key(), pusher).await;
|
||||
}
|
||||
|
||||
pub fn mailbox(&self) -> &MailboxRef {
|
||||
&self.mailbox
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a mock reply.
|
||||
pub fn send_mock_reply(
|
||||
mailbox: MailboxRef,
|
||||
mut rx: MockHeartbeatReceiver,
|
||||
msg: impl Fn(u64) -> Result<MailboxMessage> + Send + 'static,
|
||||
) {
|
||||
common_runtime::spawn_global(async move {
|
||||
while let Some(Ok(resp)) = rx.recv().await {
|
||||
let reply_id = resp.mailbox_message.unwrap().id;
|
||||
mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Generates a [InstructionReply::OpenRegion] reply.
|
||||
pub(crate) fn new_open_region_reply(
|
||||
id: u64,
|
||||
result: bool,
|
||||
error: Option<String>,
|
||||
) -> MailboxMessage {
|
||||
MailboxMessage {
|
||||
id,
|
||||
subject: "mock".to_string(),
|
||||
from: "datanode".to_string(),
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error }))
|
||||
.unwrap(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates a [InstructionReply::CloseRegion] reply.
|
||||
pub fn new_close_region_reply(id: u64) -> MailboxMessage {
|
||||
MailboxMessage {
|
||||
id,
|
||||
subject: "mock".to_string(),
|
||||
from: "datanode".to_string(),
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply {
|
||||
result: false,
|
||||
error: None,
|
||||
}))
|
||||
.unwrap(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates a [InstructionReply::DowngradeRegion] reply.
|
||||
pub fn new_downgrade_region_reply(
|
||||
id: u64,
|
||||
last_entry_id: Option<u64>,
|
||||
exist: bool,
|
||||
error: Option<String>,
|
||||
) -> MailboxMessage {
|
||||
MailboxMessage {
|
||||
id,
|
||||
subject: "mock".to_string(),
|
||||
from: "datanode".to_string(),
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply {
|
||||
last_entry_id,
|
||||
exists: exist,
|
||||
error,
|
||||
}))
|
||||
.unwrap(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates a [InstructionReply::UpgradeRegion] reply.
|
||||
pub fn new_upgrade_region_reply(
|
||||
id: u64,
|
||||
ready: bool,
|
||||
exists: bool,
|
||||
error: Option<String>,
|
||||
) -> MailboxMessage {
|
||||
MailboxMessage {
|
||||
id,
|
||||
subject: "mock".to_string(),
|
||||
from: "datanode".to_string(),
|
||||
to: "meta".to_string(),
|
||||
timestamp_millis: current_time_millis(),
|
||||
payload: Some(Payload::Json(
|
||||
serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply {
|
||||
ready,
|
||||
exists,
|
||||
error,
|
||||
}))
|
||||
.unwrap(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user