fix: filter out outdated heartbeat (#2303)

* fix: filter out outdated heartbeat, #1707

* feat: reorder handlers

* refactor: disableXXX to enableXXX

* feat: make full use of region leases to facilitate failover

* chore: minor refactor

* chore: by comment

* feat: logging on inactive/active
This commit is contained in:
JeremyHi
2023-09-03 10:25:50 +08:00
committed by Ruihang Xia
parent 648b2ae293
commit a7fa40e16d
20 changed files with 218 additions and 132 deletions

View File

@@ -93,7 +93,7 @@ struct StartCommand {
#[clap(long)]
use_memory_store: Option<bool>,
#[clap(long)]
disable_region_failover: Option<bool>,
enable_region_failover: Option<bool>,
#[clap(long)]
http_addr: Option<String>,
#[clap(long)]
@@ -140,8 +140,8 @@ impl StartCommand {
opts.use_memory_store = use_memory_store;
}
if let Some(disable_region_failover) = self.disable_region_failover {
opts.disable_region_failover = disable_region_failover;
if let Some(enable_region_failover) = self.enable_region_failover {
opts.enable_region_failover = enable_region_failover;
}
if let Some(http_addr) = &self.http_addr {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::ops::Range;
use std::sync::Arc;
use std::time::Duration;
@@ -22,17 +22,10 @@ use api::v1::meta::{
HeartbeatRequest, HeartbeatResponse, MailboxMessage, RegionLease, RequestHeader,
ResponseHeader, Role, PROTOCOL_VERSION,
};
pub use check_leader_handler::CheckLeaderHandler;
pub use collect_stats_handler::CollectStatsHandler;
use common_meta::instruction::{Instruction, InstructionReply};
use common_telemetry::{debug, info, timer, warn};
use dashmap::DashMap;
pub use failure_handler::RegionFailureHandler;
pub use keep_lease_handler::KeepLeaseHandler;
use metrics::{decrement_gauge, increment_gauge};
pub use on_leader_start_handler::OnLeaderStartHandler;
pub use persist_stats_handler::PersistStatsHandler;
pub use response_header_handler::ResponseHeaderHandler;
use snafu::{OptionExt, ResultExt};
use tokio::sync::mpsc::Sender;
use tokio::sync::{oneshot, Notify, RwLock};
@@ -46,17 +39,18 @@ use crate::service::mailbox::{
BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
};
mod check_leader_handler;
mod collect_stats_handler;
pub(crate) mod failure_handler;
mod keep_lease_handler;
pub mod check_leader_handler;
pub mod collect_stats_handler;
pub mod failure_handler;
pub mod filter_inactive_region_stats;
pub mod keep_lease_handler;
pub mod mailbox_handler;
pub mod node_stat;
mod on_leader_start_handler;
mod persist_stats_handler;
pub mod on_leader_start_handler;
pub mod persist_stats_handler;
pub mod publish_heartbeat_handler;
pub(crate) mod region_lease_handler;
mod response_header_handler;
pub mod region_lease_handler;
pub mod response_header_handler;
#[async_trait::async_trait]
pub trait HeartbeatHandler: Send + Sync {
@@ -81,6 +75,7 @@ pub struct HeartbeatAccumulator {
pub header: Option<ResponseHeader>,
pub instructions: Vec<Instruction>,
pub stat: Option<Stat>,
pub inactive_region_ids: HashSet<u64>,
pub region_lease: Option<RegionLease>,
}
@@ -404,11 +399,13 @@ mod tests {
use api::v1::meta::{MailboxMessage, RequestHeader, Role, PROTOCOL_VERSION};
use tokio::sync::mpsc;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_stats_handler::CollectStatsHandler;
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox,
OnLeaderStartHandler, PersistStatsHandler, Pusher, ResponseHeaderHandler,
};
use crate::handler::on_leader_start_handler::OnLeaderStartHandler;
use crate::handler::persist_stats_handler::PersistStatsHandler;
use crate::handler::response_header_handler::ResponseHeaderHandler;
use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
use crate::sequence::Sequence;
use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
use crate::service::store::memory::MemStore;

View File

@@ -19,7 +19,6 @@ use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
#[derive(Default)]
pub struct CheckLeaderHandler;
#[async_trait::async_trait]

View File

@@ -20,7 +20,6 @@ use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
#[derive(Default)]
pub struct CollectStatsHandler;
#[async_trait::async_trait]

View File

@@ -0,0 +1,54 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;
use common_telemetry::warn;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
pub struct FilterInactiveRegionStatsHandler;
#[async_trait]
impl HeartbeatHandler for FilterInactiveRegionStatsHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}
async fn handle(
&self,
req: &HeartbeatRequest,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if acc.inactive_region_ids.is_empty() {
return Ok(());
}
warn!(
"The heartbeat of this node {:?} contains some inactive regions: {:?}",
req.peer, acc.inactive_region_ids
);
let Some(stat) = acc.stat.as_mut() else {
return Ok(());
};
stat.retain_active_region_stats(&acc.inactive_region_ids);
Ok(())
}
}

View File

@@ -22,7 +22,6 @@ use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::keys::{LeaseKey, LeaseValue};
use crate::metasrv::Context;
#[derive(Default)]
pub struct KeepLeaseHandler;
#[async_trait::async_trait]

View File

@@ -18,7 +18,6 @@ use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
#[derive(Default)]
pub struct MailboxHandler;
#[async_trait::async_trait]

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use api::v1::meta::HeartbeatRequest;
use common_time::util as time_util;
use serde::{Deserialize, Serialize};
@@ -60,6 +62,18 @@ impl Stat {
pub fn region_ids(&self) -> Vec<u64> {
self.region_stats.iter().map(|s| s.id).collect()
}
pub fn retain_active_region_stats(&mut self, inactive_region_ids: &HashSet<u64>) {
if inactive_region_ids.is_empty() {
return;
}
self.region_stats
.retain(|r| !inactive_region_ids.contains(&r.id));
self.rcus = self.region_stats.iter().map(|s| s.rcus).sum();
self.wcus = self.region_stats.iter().map(|s| s.wcus).sum();
self.region_num = self.region_stats.len() as u64;
}
}
impl TryFrom<HeartbeatRequest> for Stat {

View File

@@ -18,7 +18,6 @@ use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
#[derive(Default)]
pub struct OnLeaderStartHandler;
#[async_trait::async_trait]

View File

@@ -12,24 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::inactive_node_manager::InactiveNodeManager;
use crate::metasrv::Context;
/// The lease seconds of a region. It's set by two default heartbeat intervals (5 second × 2) plus
/// two roundtrip time (2 second × 2 × 2), plus some extra buffer (2 second).
// TODO(LFC): Make region lease seconds calculated from Datanode heartbeat configuration.
pub(crate) const REGION_LEASE_SECONDS: u64 = 20;
pub struct RegionLeaseHandler {
region_lease_seconds: u64,
}
#[derive(Default)]
pub(crate) struct RegionLeaseHandler;
impl RegionLeaseHandler {
pub fn new(region_lease_seconds: u64) -> Self {
Self {
region_lease_seconds,
}
}
}
#[async_trait]
impl HeartbeatHandler for RegionLeaseHandler {
@@ -47,26 +48,18 @@ impl HeartbeatHandler for RegionLeaseHandler {
return Ok(());
};
let mut table_region_leases = HashMap::new();
stat.region_stats.iter().for_each(|region_stat| {
let region_id = RegionId::from(region_stat.id);
table_region_leases
.entry(region_id.table_id())
.or_insert_with(Vec::new)
.push(region_id.region_number());
});
let mut region_ids = stat.region_ids();
let inactive_node_manager = InactiveNodeManager::new(&ctx.in_memory);
inactive_node_manager
let inactive_region_ids = inactive_node_manager
.retain_active_regions(stat.cluster_id, stat.id, &mut region_ids)
.await?;
acc.inactive_region_ids = inactive_region_ids;
acc.region_lease = Some(RegionLease {
region_ids,
duration_since_epoch: req.duration_since_epoch,
lease_seconds: REGION_LEASE_SECONDS,
lease_seconds: self.region_lease_seconds,
});
Ok(())
@@ -80,11 +73,12 @@ mod test {
use common_meta::ident::TableIdent;
use common_meta::key::TableMetadataManager;
use common_meta::RegionIdent;
use store_api::storage::RegionNumber;
use store_api::storage::{RegionId, RegionNumber};
use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::DEFAULT_REGION_LEASE_SECS;
use crate::service::store::kv::KvBackendAdapter;
use crate::{table_routes, test_util};
@@ -158,12 +152,15 @@ mod test {
.await
.unwrap();
RegionLeaseHandler.handle(&req, ctx, acc).await.unwrap();
RegionLeaseHandler::new(DEFAULT_REGION_LEASE_SECS)
.handle(&req, ctx, acc)
.await
.unwrap();
assert!(acc.region_lease.is_some());
let lease = acc.region_lease.as_ref().unwrap();
assert_eq!(lease.region_ids, vec![RegionId::new(table_id, 2).as_u64()]);
assert_eq!(lease.duration_since_epoch, 1234);
assert_eq!(lease.lease_seconds, REGION_LEASE_SECONDS);
assert_eq!(lease.lease_seconds, DEFAULT_REGION_LEASE_SECS);
}
}

View File

@@ -18,7 +18,6 @@ use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
#[derive(Default)]
pub struct ResponseHeaderHandler;
#[async_trait::async_trait]

View File

@@ -19,7 +19,7 @@ use common_meta::RegionIdent;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::keys::InactiveNodeKey;
use crate::keys::InactiveRegionKey;
use crate::service::store::kv::ResettableKvStoreRef;
pub struct InactiveNodeManager<'a> {
@@ -37,7 +37,7 @@ impl<'a> InactiveNodeManager<'a> {
region_ident.region_number,
)
.as_u64();
let key = InactiveNodeKey {
let key = InactiveRegionKey {
cluster_id: region_ident.cluster_id,
node_id: region_ident.datanode_id,
region_id,
@@ -57,7 +57,7 @@ impl<'a> InactiveNodeManager<'a> {
region_ident.region_number,
)
.as_u64();
let key: Vec<u8> = InactiveNodeKey {
let key: Vec<u8> = InactiveRegionKey {
cluster_id: region_ident.cluster_id,
node_id: region_ident.datanode_id,
region_id,
@@ -68,19 +68,19 @@ impl<'a> InactiveNodeManager<'a> {
}
/// The input is a list of regions on a specific node. If one or more regions have been
/// set to inactive state by metasrv, the corresponding regions will be removed, then
/// return the remaining regions.
/// set to inactive state by metasrv, the corresponding regions will be removed(update the
/// `region_ids`), then returns the removed regions.
pub async fn retain_active_regions(
&self,
cluster_id: u64,
node_id: u64,
region_ids: &mut Vec<u64>,
) -> Result<()> {
) -> Result<HashSet<u64>> {
let key_region_ids = region_ids
.iter()
.map(|region_id| {
(
InactiveNodeKey {
InactiveRegionKey {
cluster_id,
node_id,
region_id: *region_id,
@@ -94,17 +94,24 @@ impl<'a> InactiveNodeManager<'a> {
let resp = self.store.batch_get(BatchGetRequest { keys }).await?;
let kvs = resp.kvs;
if kvs.is_empty() {
return Ok(());
return Ok(HashSet::new());
}
let inactive_keys = kvs.into_iter().map(|kv| kv.key).collect::<HashSet<_>>();
let active_region_ids = key_region_ids
.into_iter()
.filter(|(key, _)| !inactive_keys.contains(key))
.map(|(_, region_id)| region_id)
.collect::<Vec<_>>();
*region_ids = active_region_ids;
let (active_region_ids, inactive_region_ids): (Vec<Option<u64>>, Vec<Option<u64>>) =
key_region_ids
.into_iter()
.map(|(key, region_id)| {
let is_active = !inactive_keys.contains(&key);
if is_active {
(Some(region_id), None)
} else {
(None, Some(region_id))
}
})
.unzip();
*region_ids = active_region_ids.into_iter().flatten().collect();
Ok(())
Ok(inactive_region_ids.into_iter().flatten().collect())
}
}

View File

@@ -26,7 +26,7 @@ use crate::handler::node_stat::Stat;
pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease";
pub(crate) const SEQ_PREFIX: &str = "__meta_seq";
pub(crate) const INACTIVE_NODE_PREFIX: &str = "__meta_inactive_node";
pub(crate) const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
pub const DN_STAT_PREFIX: &str = "__meta_dnstat";
@@ -238,17 +238,17 @@ impl TryFrom<Vec<u8>> for StatValue {
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct InactiveNodeKey {
pub struct InactiveRegionKey {
pub cluster_id: u64,
pub node_id: u64,
pub region_id: u64,
}
impl From<InactiveNodeKey> for Vec<u8> {
fn from(value: InactiveNodeKey) -> Self {
impl From<InactiveRegionKey> for Vec<u8> {
fn from(value: InactiveRegionKey) -> Self {
format!(
"{}-{}-{}-{}",
INACTIVE_NODE_PREFIX, value.cluster_id, value.node_id, value.region_id
INACTIVE_REGION_PREFIX, value.cluster_id, value.node_id, value.region_id
)
.into_bytes()
}

View File

@@ -24,10 +24,10 @@ use crate::keys::{LeaseKey, LeaseValue, DN_LEASE_PREFIX};
pub async fn alive_datanodes(
cluster_id: u64,
meta_peer_client: &MetaPeerClientRef,
lease_secs: i64,
lease_secs: u64,
) -> Result<HashMap<LeaseKey, LeaseValue>> {
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
time_util::current_time_millis() - v.timestamp_millis < lease_secs * 1000
((time_util::current_time_millis() - v.timestamp_millis) as u64) < lease_secs * 1000
};
filter_datanodes(cluster_id, meta_peer_client, lease_filter).await

View File

@@ -45,7 +45,12 @@ use crate::sequence::SequenceRef;
use crate::service::mailbox::MailboxRef;
use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef};
pub const TABLE_ID_SEQ: &str = "table_id";
const METASRV_HOME: &str = "/tmp/metasrv";
pub const METASRV_HOME: &str = "/tmp/metasrv";
pub const DEFAULT_DATANODE_LEASE_SECS: u64 = 20;
/// The lease seconds of a region. It's set by two default heartbeat intervals (5 second × 2) plus
/// two roundtrip time (2 second × 2 × 2), plus some extra buffer (2 second).
pub const DEFAULT_REGION_LEASE_SECS: u64 = 20;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
@@ -53,10 +58,11 @@ pub struct MetaSrvOptions {
pub bind_addr: String,
pub server_addr: String,
pub store_addr: String,
pub datanode_lease_secs: i64,
pub datanode_lease_secs: u64,
pub region_lease_secs: u64,
pub selector: SelectorType,
pub use_memory_store: bool,
pub disable_region_failover: bool,
pub enable_region_failover: bool,
pub http_opts: HttpOptions,
pub logging: LoggingOptions,
pub procedure: ProcedureConfig,
@@ -71,10 +77,11 @@ impl Default for MetaSrvOptions {
bind_addr: "127.0.0.1:3002".to_string(),
server_addr: "127.0.0.1:3002".to_string(),
store_addr: "127.0.0.1:2379".to_string(),
datanode_lease_secs: 15,
datanode_lease_secs: DEFAULT_DATANODE_LEASE_SECS,
region_lease_secs: DEFAULT_REGION_LEASE_SECS,
selector: SelectorType::default(),
use_memory_store: false,
disable_region_failover: false,
enable_region_failover: true,
http_opts: HttpOptions::default(),
logging: LoggingOptions {
dir: format!("{METASRV_HOME}/logs"),
@@ -161,8 +168,8 @@ pub struct LeaderValue(pub String);
#[derive(Clone)]
pub struct SelectorContext {
pub datanode_lease_secs: i64,
pub server_addr: String,
pub datanode_lease_secs: u64,
pub kv_store: KvStoreRef,
pub meta_peer_client: MetaPeerClientRef,
pub catalog: Option<String>,
@@ -372,6 +379,7 @@ impl MetaSrv {
let mailbox = self.mailbox.clone();
let election = self.election.clone();
let skip_all = Arc::new(AtomicBool::new(false));
let table_metadata_manager = self.table_metadata_manager.clone();
Context {
server_addr,
@@ -383,7 +391,7 @@ impl MetaSrv {
election,
skip_all,
is_infancy: false,
table_metadata_manager: self.table_metadata_manager.clone(),
table_metadata_manager,
}
}
}

View File

@@ -27,14 +27,18 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::ddl::{DdlManager, DdlManagerRef};
use crate::error::Result;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_stats_handler::CollectStatsHandler;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
use crate::handler::keep_lease_handler::KeepLeaseHandler;
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::on_leader_start_handler::OnLeaderStartHandler;
use crate::handler::persist_stats_handler::PersistStatsHandler;
use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox,
KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, Pushers, RegionFailureHandler,
ResponseHeaderHandler,
};
use crate::handler::response_header_handler::ResponseHeaderHandler;
use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pushers};
use crate::lock::memory::MemLock;
use crate::lock::DistLockRef;
use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef};
@@ -186,10 +190,8 @@ impl MetaSrvBuilder {
let handler_group = match handler_group {
Some(handler_group) => handler_group,
None => {
let region_failover_handler = if options.disable_region_failover {
None
} else {
let select_ctx = SelectorContext {
let region_failover_handler = if options.enable_region_failover {
let selector_ctx = SelectorContext {
server_addr: options.server_addr.clone(),
datanode_lease_secs: options.datanode_lease_secs,
kv_store: kv_store.clone(),
@@ -199,11 +201,11 @@ impl MetaSrvBuilder {
table: None,
};
let region_failover_manager = Arc::new(RegionFailoverManager::new(
options.region_lease_secs,
in_memory.clone(),
mailbox.clone(),
procedure_manager.clone(),
selector.clone(),
select_ctx,
(selector.clone(), selector_ctx),
lock.clone(),
table_metadata_manager.clone(),
));
@@ -211,8 +213,18 @@ impl MetaSrvBuilder {
RegionFailureHandler::try_new(election.clone(), region_failover_manager)
.await?,
)
} else {
None
};
let publish_heartbeat_handler = if let Some((publish, _)) = pubsub.as_ref() {
Some(PublishHeartbeatHandler::new(publish.clone()))
} else {
None
};
let region_lease_handler = RegionLeaseHandler::new(options.region_lease_secs);
let group = HeartbeatHandlerGroup::new(pushers);
group.add_handler(ResponseHeaderHandler).await;
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
@@ -223,16 +235,15 @@ impl MetaSrvBuilder {
group.add_handler(OnLeaderStartHandler).await;
group.add_handler(CollectStatsHandler).await;
group.add_handler(MailboxHandler).await;
group.add_handler(region_lease_handler).await;
group.add_handler(FilterInactiveRegionStatsHandler).await;
if let Some(region_failover_handler) = region_failover_handler {
group.add_handler(region_failover_handler).await;
}
group.add_handler(RegionLeaseHandler).await;
group.add_handler(PersistStatsHandler::default()).await;
if let Some((publish, _)) = pubsub.as_ref() {
group
.add_handler(PublishHeartbeatHandler::new(publish.clone()))
.await;
if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
group.add_handler(publish_heartbeat_handler).await;
}
group.add_handler(PersistStatsHandler::default()).await;
group
}
};

View File

@@ -49,7 +49,6 @@ use crate::service::mailbox::MailboxRef;
use crate::service::store::kv::ResettableKvStoreRef;
const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30);
const CLOSE_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(2);
/// A key for the preventing running multiple failover procedures for the same region.
#[derive(PartialEq, Eq, Hash, Clone)]
@@ -70,6 +69,7 @@ impl From<RegionIdent> for RegionFailoverKey {
}
pub(crate) struct RegionFailoverManager {
region_lease_secs: u64,
in_memory: ResettableKvStoreRef,
mailbox: MailboxRef,
procedure_manager: ProcedureManagerRef,
@@ -93,15 +93,16 @@ impl Drop for FailoverProcedureGuard {
impl RegionFailoverManager {
pub(crate) fn new(
region_lease_secs: u64,
in_memory: ResettableKvStoreRef,
mailbox: MailboxRef,
procedure_manager: ProcedureManagerRef,
selector: SelectorRef,
selector_ctx: SelectorContext,
(selector, selector_ctx): (SelectorRef, SelectorContext),
dist_lock: DistLockRef,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
region_lease_secs,
in_memory,
mailbox,
procedure_manager,
@@ -115,6 +116,7 @@ impl RegionFailoverManager {
pub(crate) fn create_context(&self) -> RegionFailoverContext {
RegionFailoverContext {
region_lease_secs: self.region_lease_secs,
in_memory: self.in_memory.clone(),
mailbox: self.mailbox.clone(),
selector: self.selector.clone(),
@@ -247,6 +249,7 @@ struct Node {
/// The "Context" of region failover procedure state machine.
#[derive(Clone)]
pub struct RegionFailoverContext {
pub region_lease_secs: u64,
pub in_memory: ResettableKvStoreRef,
pub mailbox: MailboxRef,
pub selector: SelectorRef,
@@ -543,6 +546,7 @@ mod tests {
TestingEnv {
context: RegionFailoverContext {
region_lease_secs: 10,
in_memory,
mailbox,
selector,
@@ -770,7 +774,7 @@ mod tests {
let result = procedure.execute(&ctx).await;
assert!(matches!(result, Ok(Status::Executing { persist: true })));
assert_eq!(
r#"{"region_failover_state":"DeactivateRegion","candidate":{"id":42,"addr":""},"region_lease_expiry_seconds":40}"#,
r#"{"region_failover_state":"DeactivateRegion","candidate":{"id":42,"addr":""}}"#,
serde_json::to_string(&procedure.node.state).unwrap()
);
}

View File

@@ -19,7 +19,7 @@ use async_trait::async_trait;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::peer::Peer;
use common_meta::RegionIdent;
use common_telemetry::debug;
use common_telemetry::{debug, info};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -135,6 +135,7 @@ impl State for ActivateRegion {
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
info!("Activating region: {failed_region:?}");
let mailbox_receiver = self
.send_open_region_message(ctx, failed_region, OPEN_REGION_MESSAGE_TIMEOUT)
.await?;

View File

@@ -19,7 +19,7 @@ use async_trait::async_trait;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::peer::Peer;
use common_meta::RegionIdent;
use common_telemetry::debug;
use common_telemetry::{debug, info, warn};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -28,31 +28,24 @@ use super::{RegionFailoverContext, State};
use crate::error::{
Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
};
use crate::handler::region_lease_handler::REGION_LEASE_SECONDS;
use crate::handler::HeartbeatMailbox;
use crate::inactive_node_manager::InactiveNodeManager;
use crate::procedure::region_failover::CLOSE_REGION_MESSAGE_TIMEOUT;
use crate::service::mailbox::{Channel, MailboxReceiver};
#[derive(Serialize, Deserialize, Debug)]
pub(super) struct DeactivateRegion {
candidate: Peer,
region_lease_expiry_seconds: u64,
}
impl DeactivateRegion {
pub(super) fn new(candidate: Peer) -> Self {
Self {
candidate,
region_lease_expiry_seconds: REGION_LEASE_SECONDS * 2,
}
Self { candidate }
}
async fn send_close_region_message(
&self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
timeout: Duration,
) -> Result<MailboxReceiver> {
let instruction = Instruction::CloseRegion(failed_region.clone());
@@ -67,11 +60,17 @@ impl DeactivateRegion {
input: instruction.to_string(),
})?;
let ch = Channel::Datanode(failed_region.datanode_id);
// Mark the region as inactive
InactiveNodeManager::new(&ctx.in_memory)
.register_inactive_region(failed_region)
.await?;
let ch = Channel::Datanode(failed_region.datanode_id);
// We first marked the region as inactive, which means that the failed region cannot
// be successfully renewed from now on, so after the lease time is exceeded, the region
// will be automatically closed.
// If the deadline is exceeded, we can proceed to the next step with confidence,
// as the expiration means that the region has been closed.
let timeout = Duration::from_secs(ctx.region_lease_secs);
ctx.mailbox.send(&ch, msg, timeout).await
}
@@ -110,10 +109,10 @@ impl DeactivateRegion {
}
}
Err(Error::MailboxTimeout { .. }) => {
// Since we are in a region failover situation, the Datanode that the failed region
// resides might be unreachable. So we wait for the region lease to expire. The
// region would be closed by its own [RegionAliveKeeper].
self.wait_for_region_lease_expiry().await;
// We have configured the timeout to match the region lease timeout before making
// the call and have disabled region lease renewal. Therefore, if a timeout error
// occurs, it can be concluded that the region has been closed. With this information,
// we can proceed confidently to the next step.
Ok(Box::new(ActivateRegion::new(self.candidate.clone())))
}
Err(e) => Err(e),
@@ -123,8 +122,8 @@ impl DeactivateRegion {
/// Sleep for `region_lease_expiry_seconds`, to make sure the region is closed (by its
/// region alive keeper). This is critical for region not being opened in multiple Datanodes
/// simultaneously.
async fn wait_for_region_lease_expiry(&self) {
tokio::time::sleep(Duration::from_secs(self.region_lease_expiry_seconds)).await;
async fn wait_for_region_lease_expiry(&self, ctx: &RegionFailoverContext) {
tokio::time::sleep(Duration::from_secs(ctx.region_lease_secs)).await;
}
}
@@ -136,14 +135,17 @@ impl State for DeactivateRegion {
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
let result = self
.send_close_region_message(ctx, failed_region, CLOSE_REGION_MESSAGE_TIMEOUT)
.await;
info!("Deactivating region: {failed_region:?}");
let result = self.send_close_region_message(ctx, failed_region).await;
let mailbox_receiver = match result {
Ok(mailbox_receiver) => mailbox_receiver,
Err(Error::PusherNotFound { .. }) => {
warn!(
"Datanode {} is not reachable, skip deactivating region {}, just wait for the region lease to expire",
failed_region.datanode_id, failed_region
);
// See the mailbox received timeout situation comments above.
self.wait_for_region_lease_expiry().await;
self.wait_for_region_lease_expiry(ctx).await;
return Ok(Box::new(ActivateRegion::new(self.candidate.clone())));
}
Err(e) => return Err(e),
@@ -171,7 +173,7 @@ mod tests {
let state = DeactivateRegion::new(Peer::new(2, ""));
let mailbox_receiver = state
.send_close_region_message(&env.context, &failed_region, Duration::from_millis(100))
.send_close_region_message(&env.context, &failed_region)
.await
.unwrap();
@@ -235,12 +237,9 @@ mod tests {
let mut env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let state = DeactivateRegion {
candidate: Peer::new(2, ""),
region_lease_expiry_seconds: 2,
};
let state = DeactivateRegion::new(Peer::new(2, ""));
let mailbox_receiver = state
.send_close_region_message(&env.context, &failed_region, Duration::from_millis(100))
.send_close_region_message(&env.context, &failed_region)
.await
.unwrap();

View File

@@ -76,11 +76,11 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
};
Arc::new(RegionFailoverManager::new(
10,
in_memory,
mailbox,
procedure_manager,
selector,
selector_ctx,
(selector, selector_ctx),
Arc::new(MemLock::default()),
Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(kv_store))),
))