mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
refactor: heartbeat response contains region role (#2718)
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3613,7 +3613,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5da72f1cae6b24315e5afc87520aaf7b4d6bb872#5da72f1cae6b24315e5afc87520aaf7b4d6bb872"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7eb2e78be7a104d2582fbea0bcb1e019407da702#7eb2e78be7a104d2582fbea0bcb1e019407da702"
|
||||
dependencies = [
|
||||
"prost 0.12.1",
|
||||
"serde",
|
||||
|
||||
@@ -83,7 +83,7 @@ derive_builder = "0.12"
|
||||
etcd-client = "0.12"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5da72f1cae6b24315e5afc87520aaf7b4d6bb872" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7eb2e78be7a104d2582fbea0bcb1e019407da702" }
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
lazy_static = "1.4"
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::future::Future;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::GrantedRegion;
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
@@ -26,6 +27,7 @@ use common_meta::heartbeat::handler::{
|
||||
};
|
||||
use common_telemetry::{debug, error, info, trace, warn};
|
||||
use snafu::OptionExt;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::region_request::{RegionCloseRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
#[cfg(test)]
|
||||
@@ -122,10 +124,11 @@ impl RegionAliveKeeper {
|
||||
}
|
||||
}
|
||||
|
||||
async fn keep_lived(&self, designated_regions: Vec<RegionId>, deadline: Instant) {
|
||||
for region_id in designated_regions {
|
||||
async fn keep_lived(&self, regions: &[GrantedRegion], deadline: Instant) {
|
||||
for region in regions {
|
||||
let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
|
||||
if let Some(handle) = self.find_handle(region_id).await {
|
||||
handle.reset_deadline(deadline).await;
|
||||
handle.reset_deadline(role, deadline).await;
|
||||
}
|
||||
// Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
|
||||
}
|
||||
@@ -235,12 +238,8 @@ impl HeartbeatResponseHandler for RegionAliveKeeper {
|
||||
})?;
|
||||
let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch);
|
||||
let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds);
|
||||
let region_ids = region_lease
|
||||
.region_ids
|
||||
.iter()
|
||||
.map(|id| RegionId::from_u64(*id))
|
||||
.collect();
|
||||
self.keep_lived(region_ids, deadline).await;
|
||||
|
||||
self.keep_lived(®ion_lease.regions, deadline).await;
|
||||
Ok(HandleControl::Continue)
|
||||
}
|
||||
}
|
||||
@@ -251,7 +250,8 @@ enum CountdownCommand {
|
||||
/// 4 * `heartbeat_interval_millis`
|
||||
Start(u64),
|
||||
/// Reset countdown deadline to the given instance.
|
||||
Reset(Instant),
|
||||
/// (NextRole, Deadline)
|
||||
Reset((RegionRole, Instant)),
|
||||
/// Returns the current deadline of the countdown task.
|
||||
#[cfg(test)]
|
||||
Deadline(oneshot::Sender<Instant>),
|
||||
@@ -319,8 +319,12 @@ impl CountdownTaskHandle {
|
||||
None
|
||||
}
|
||||
|
||||
async fn reset_deadline(&self, deadline: Instant) {
|
||||
if let Err(e) = self.tx.send(CountdownCommand::Reset(deadline)).await {
|
||||
async fn reset_deadline(&self, role: RegionRole, deadline: Instant) {
|
||||
if let Err(e) = self
|
||||
.tx
|
||||
.send(CountdownCommand::Reset((role, deadline)))
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"Failed to reset region alive keeper deadline: {e}. \
|
||||
Maybe the task is stopped due to region been closed."
|
||||
@@ -368,13 +372,13 @@ impl CountdownTask {
|
||||
let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
|
||||
countdown.set(tokio::time::sleep_until(first_deadline));
|
||||
},
|
||||
Some(CountdownCommand::Reset(deadline)) => {
|
||||
Some(CountdownCommand::Reset((role, deadline))) => {
|
||||
if countdown.deadline() < deadline {
|
||||
trace!(
|
||||
"Reset deadline of region {region_id} to approximately {} seconds later",
|
||||
(deadline - Instant::now()).as_secs_f32(),
|
||||
);
|
||||
let _ = self.region_server.set_writable(self.region_id, true);
|
||||
let _ = self.region_server.set_writable(self.region_id, role.writable());
|
||||
countdown.set(tokio::time::sleep_until(deadline));
|
||||
}
|
||||
// Else the countdown could be either:
|
||||
@@ -434,6 +438,8 @@ impl CountdownTask {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use api::v1::meta::RegionRole;
|
||||
|
||||
use super::*;
|
||||
use crate::tests::mock_region_server;
|
||||
|
||||
@@ -455,7 +461,13 @@ mod test {
|
||||
|
||||
// extend lease then sleep
|
||||
alive_keeper
|
||||
.keep_lived(vec![region_id], Instant::now() + Duration::from_millis(500))
|
||||
.keep_lived(
|
||||
&[GrantedRegion {
|
||||
region_id: region_id.as_u64(),
|
||||
role: RegionRole::Leader.into(),
|
||||
}],
|
||||
Instant::now() + Duration::from_millis(500),
|
||||
)
|
||||
.await;
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
assert!(alive_keeper.find_handle(region_id).await.is_some());
|
||||
@@ -499,7 +511,10 @@ mod test {
|
||||
// reset deadline
|
||||
// a nearer deadline will be ignored
|
||||
countdown_handle
|
||||
.reset_deadline(Instant::now() + Duration::from_millis(heartbeat_interval_millis))
|
||||
.reset_deadline(
|
||||
RegionRole::Leader.into(),
|
||||
Instant::now() + Duration::from_millis(heartbeat_interval_millis),
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
countdown_handle.deadline().await.unwrap()
|
||||
@@ -508,7 +523,10 @@ mod test {
|
||||
|
||||
// only a farther deadline will be accepted
|
||||
countdown_handle
|
||||
.reset_deadline(Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5))
|
||||
.reset_deadline(
|
||||
RegionRole::Leader.into(),
|
||||
Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5),
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
countdown_handle.deadline().await.unwrap()
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
|
||||
use api::v1::meta::{GrantedRegion, HeartbeatRequest, RegionLease, RegionRole, Role};
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -55,9 +55,17 @@ impl HeartbeatHandler for RegionLeaseHandler {
|
||||
.retain_active_regions(stat.cluster_id, stat.id, &mut region_ids)
|
||||
.await?;
|
||||
|
||||
let regions = region_ids
|
||||
.into_iter()
|
||||
.map(|region_id| GrantedRegion {
|
||||
region_id,
|
||||
role: RegionRole::Leader.into(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
acc.inactive_region_ids = inactive_region_ids;
|
||||
acc.region_lease = Some(RegionLease {
|
||||
region_ids,
|
||||
regions,
|
||||
duration_since_epoch: req.duration_since_epoch,
|
||||
lease_seconds: self.region_lease_seconds,
|
||||
});
|
||||
@@ -70,9 +78,9 @@ impl HeartbeatHandler for RegionLeaseHandler {
|
||||
mod test {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::RegionRole;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
use common_meta::{distributed_time_constants, RegionIdent};
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
|
||||
use super::*;
|
||||
@@ -113,7 +121,7 @@ mod test {
|
||||
approximate_bytes: 0,
|
||||
approximate_rows: 0,
|
||||
engine: String::new(),
|
||||
role: RegionRole::Leader,
|
||||
role: RegionRole::Leader.into(),
|
||||
}
|
||||
};
|
||||
acc.stat = Some(Stat {
|
||||
@@ -152,7 +160,13 @@ mod test {
|
||||
|
||||
assert!(acc.region_lease.is_some());
|
||||
let lease = acc.region_lease.as_ref().unwrap();
|
||||
assert_eq!(lease.region_ids, vec![RegionId::new(table_id, 2).as_u64()]);
|
||||
assert_eq!(
|
||||
lease.regions,
|
||||
vec![GrantedRegion {
|
||||
region_id: RegionId::new(table_id, 2).as_u64(),
|
||||
role: RegionRole::Leader.into()
|
||||
}]
|
||||
);
|
||||
assert_eq!(lease.duration_since_epoch, 1234);
|
||||
assert_eq!(
|
||||
lease.lease_seconds,
|
||||
|
||||
@@ -35,6 +35,12 @@ pub enum RegionRole {
|
||||
Leader,
|
||||
}
|
||||
|
||||
impl RegionRole {
|
||||
pub fn writable(&self) -> bool {
|
||||
matches!(self, RegionRole::Leader)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RegionRole> for PbRegionRole {
|
||||
fn from(value: RegionRole) -> Self {
|
||||
match value {
|
||||
|
||||
Reference in New Issue
Block a user