diff --git a/Cargo.lock b/Cargo.lock index 5db8a29a3b..d96eb26759 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3613,7 +3613,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5da72f1cae6b24315e5afc87520aaf7b4d6bb872#5da72f1cae6b24315e5afc87520aaf7b4d6bb872" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7eb2e78be7a104d2582fbea0bcb1e019407da702#7eb2e78be7a104d2582fbea0bcb1e019407da702" dependencies = [ "prost 0.12.1", "serde", diff --git a/Cargo.toml b/Cargo.toml index f7d29eb2f6..0659941d9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ derive_builder = "0.12" etcd-client = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5da72f1cae6b24315e5afc87520aaf7b4d6bb872" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7eb2e78be7a104d2582fbea0bcb1e019407da702" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index 4ba8197bc4..21282e9ff5 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -17,6 +17,7 @@ use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use api::v1::meta::GrantedRegion; use async_trait::async_trait; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -26,6 +27,7 @@ use common_meta::heartbeat::handler::{ }; use common_telemetry::{debug, error, info, trace, warn}; use snafu::OptionExt; +use store_api::region_engine::RegionRole; use store_api::region_request::{RegionCloseRequest, RegionRequest}; use store_api::storage::RegionId; #[cfg(test)] @@ -122,10 +124,11 @@ impl RegionAliveKeeper { } } - async fn keep_lived(&self, designated_regions: Vec, deadline: Instant) { - for region_id in designated_regions { + async fn keep_lived(&self, regions: &[GrantedRegion], deadline: Instant) { + for region in regions { + let (role, region_id) = (region.role().into(), RegionId::from(region.region_id)); if let Some(handle) = self.find_handle(region_id).await { - handle.reset_deadline(deadline).await; + handle.reset_deadline(role, deadline).await; } // Else the region alive keeper might be triggered by lagging messages, we can safely ignore it. } @@ -235,12 +238,8 @@ impl HeartbeatResponseHandler for RegionAliveKeeper { })?; let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch); let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds); - let region_ids = region_lease - .region_ids - .iter() - .map(|id| RegionId::from_u64(*id)) - .collect(); - self.keep_lived(region_ids, deadline).await; + + self.keep_lived(®ion_lease.regions, deadline).await; Ok(HandleControl::Continue) } } @@ -251,7 +250,8 @@ enum CountdownCommand { /// 4 * `heartbeat_interval_millis` Start(u64), /// Reset countdown deadline to the given instance. - Reset(Instant), + /// (NextRole, Deadline) + Reset((RegionRole, Instant)), /// Returns the current deadline of the countdown task. #[cfg(test)] Deadline(oneshot::Sender), @@ -319,8 +319,12 @@ impl CountdownTaskHandle { None } - async fn reset_deadline(&self, deadline: Instant) { - if let Err(e) = self.tx.send(CountdownCommand::Reset(deadline)).await { + async fn reset_deadline(&self, role: RegionRole, deadline: Instant) { + if let Err(e) = self + .tx + .send(CountdownCommand::Reset((role, deadline))) + .await + { warn!( "Failed to reset region alive keeper deadline: {e}. \ Maybe the task is stopped due to region been closed." @@ -368,13 +372,13 @@ impl CountdownTask { let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4; countdown.set(tokio::time::sleep_until(first_deadline)); }, - Some(CountdownCommand::Reset(deadline)) => { + Some(CountdownCommand::Reset((role, deadline))) => { if countdown.deadline() < deadline { trace!( "Reset deadline of region {region_id} to approximately {} seconds later", (deadline - Instant::now()).as_secs_f32(), ); - let _ = self.region_server.set_writable(self.region_id, true); + let _ = self.region_server.set_writable(self.region_id, role.writable()); countdown.set(tokio::time::sleep_until(deadline)); } // Else the countdown could be either: @@ -434,6 +438,8 @@ impl CountdownTask { #[cfg(test)] mod test { + use api::v1::meta::RegionRole; + use super::*; use crate::tests::mock_region_server; @@ -455,7 +461,13 @@ mod test { // extend lease then sleep alive_keeper - .keep_lived(vec![region_id], Instant::now() + Duration::from_millis(500)) + .keep_lived( + &[GrantedRegion { + region_id: region_id.as_u64(), + role: RegionRole::Leader.into(), + }], + Instant::now() + Duration::from_millis(500), + ) .await; tokio::time::sleep(Duration::from_millis(500)).await; assert!(alive_keeper.find_handle(region_id).await.is_some()); @@ -499,7 +511,10 @@ mod test { // reset deadline // a nearer deadline will be ignored countdown_handle - .reset_deadline(Instant::now() + Duration::from_millis(heartbeat_interval_millis)) + .reset_deadline( + RegionRole::Leader.into(), + Instant::now() + Duration::from_millis(heartbeat_interval_millis), + ) .await; assert!( countdown_handle.deadline().await.unwrap() @@ -508,7 +523,10 @@ mod test { // only a farther deadline will be accepted countdown_handle - .reset_deadline(Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5)) + .reset_deadline( + RegionRole::Leader.into(), + Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5), + ) .await; assert!( countdown_handle.deadline().await.unwrap() diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 1383ff828a..96808dcd82 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; +use api::v1::meta::{GrantedRegion, HeartbeatRequest, RegionLease, RegionRole, Role}; use async_trait::async_trait; use crate::error::Result; @@ -55,9 +55,17 @@ impl HeartbeatHandler for RegionLeaseHandler { .retain_active_regions(stat.cluster_id, stat.id, &mut region_ids) .await?; + let regions = region_ids + .into_iter() + .map(|region_id| GrantedRegion { + region_id, + role: RegionRole::Leader.into(), + }) + .collect(); + acc.inactive_region_ids = inactive_region_ids; acc.region_lease = Some(RegionLease { - region_ids, + regions, duration_since_epoch: req.duration_since_epoch, lease_seconds: self.region_lease_seconds, }); @@ -70,9 +78,9 @@ impl HeartbeatHandler for RegionLeaseHandler { mod test { use std::sync::Arc; + use api::v1::meta::RegionRole; use common_meta::key::TableMetadataManager; use common_meta::{distributed_time_constants, RegionIdent}; - use store_api::region_engine::RegionRole; use store_api::storage::{RegionId, RegionNumber}; use super::*; @@ -113,7 +121,7 @@ mod test { approximate_bytes: 0, approximate_rows: 0, engine: String::new(), - role: RegionRole::Leader, + role: RegionRole::Leader.into(), } }; acc.stat = Some(Stat { @@ -152,7 +160,13 @@ mod test { assert!(acc.region_lease.is_some()); let lease = acc.region_lease.as_ref().unwrap(); - assert_eq!(lease.region_ids, vec![RegionId::new(table_id, 2).as_u64()]); + assert_eq!( + lease.regions, + vec![GrantedRegion { + region_id: RegionId::new(table_id, 2).as_u64(), + role: RegionRole::Leader.into() + }] + ); assert_eq!(lease.duration_since_epoch, 1234); assert_eq!( lease.lease_seconds, diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 3dbeaa6e74..bac3df5bf4 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -35,6 +35,12 @@ pub enum RegionRole { Leader, } +impl RegionRole { + pub fn writable(&self) -> bool { + matches!(self, RegionRole::Leader) + } +} + impl From for PbRegionRole { fn from(value: RegionRole) -> Self { match value {