diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 1856d65634..1ea7c584bc 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -154,6 +154,14 @@ pub enum Error { #[snafu(display("Failed to get sequence: {}", err_msg))] NextSequence { err_msg: String, location: Location }, + #[snafu(display("Sequence out of range: {}, start={}, step={}", name, start, step))] + SequenceOutOfRange { + name: String, + start: u64, + step: u64, + location: Location, + }, + #[snafu(display("MetaSrv has no leader at this moment"))] NoLeader { location: Location }, @@ -379,6 +387,7 @@ impl ErrorExt for Error { | Error::UnexceptedSequenceValue { .. } | Error::TableRouteNotFound { .. } | Error::NextSequence { .. } + | Error::SequenceOutOfRange { .. } | Error::MoveValue { .. } | Error::InvalidKvsLength { .. } | Error::InvalidTxnResult { .. } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 34064e1591..534fa408e5 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -241,6 +241,18 @@ impl HeartbeatMailbox { } } } + + #[inline] + async fn next_message_id(&self) -> Result { + // In this implementation, we pre-occupy the message_id of 0, + // and we use `message_id = 0` to mark a Message as a one-way call. + loop { + let next = self.sequence.next().await?; + if next > 0 { + return Ok(next); + } + } + } } #[async_trait::async_trait] @@ -251,7 +263,7 @@ impl Mailbox for HeartbeatMailbox { mut msg: MailboxMessage, timeout: Duration, ) -> Result { - let message_id = self.sequence.next().await?; + let message_id = self.next_message_id().await?; let pusher_id = match ch { Channel::Datanode(id) => format!("{}-{}", Role::Datanode as i32, id), @@ -288,15 +300,7 @@ impl Mailbox for HeartbeatMailbox { tx.send(maybe_msg) .map_err(|_| error::MailboxClosedSnafu { id }.build())?; } else if let Ok(finally_msg) = maybe_msg { - let MailboxMessage { - id, - subject, - from, - to, - timestamp_millis, - .. - } = finally_msg; - warn!("The response arrived too late, id={id}, subject={subject}, from={from}, to={to}, timestamp={timestamp_millis}"); + warn!("The response arrived too late: {finally_msg:?}"); } Ok(()) diff --git a/src/meta-srv/src/handler/check_leader_handler.rs b/src/meta-srv/src/handler/check_leader_handler.rs index 2d83b5e8f1..eadcce8d2a 100644 --- a/src/meta-srv/src/handler/check_leader_handler.rs +++ b/src/meta-srv/src/handler/check_leader_handler.rs @@ -13,6 +13,7 @@ // limitations under the License. use api::v1::meta::{Error, HeartbeatRequest, Role}; +use common_telemetry::warn; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; @@ -29,7 +30,7 @@ impl HeartbeatHandler for CheckLeaderHandler { async fn handle( &self, - _req: &HeartbeatRequest, + req: &HeartbeatRequest, ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { @@ -40,6 +41,7 @@ impl HeartbeatHandler for CheckLeaderHandler { if let Some(header) = &mut acc.header { header.error = Some(Error::is_not_leader()); ctx.set_skip_all(); + warn!("Received a heartbeat {:?}, but the current node is not the leader, so the heartbeat will be ignored.", req.header); } } Ok(()) diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 90f8822115..8e84b3d023 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -161,7 +161,7 @@ impl MetaSrvBuilder { let metadata_service = metadata_service .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone()))); - let mailbox_sequence = Sequence::new("heartbeat_mailbox", 0, 100, kv_store.clone()); + let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_store.clone()); let mailbox = HeartbeatMailbox::create(handler_group.pushers(), mailbox_sequence); MetaSrv { diff --git a/src/meta-srv/src/sequence.rs b/src/meta-srv/src/sequence.rs index 5ff9b89d14..0ebed971ef 100644 --- a/src/meta-srv/src/sequence.rs +++ b/src/meta-srv/src/sequence.rs @@ -16,7 +16,7 @@ use std::ops::Range; use std::sync::Arc; use api::v1::meta::CompareAndPutRequest; -use snafu::ensure; +use snafu::{ensure, OptionExt}; use tokio::sync::Mutex; use crate::error::{self, Result}; @@ -106,7 +106,15 @@ impl Inner { } else { u64::to_le_bytes(start).to_vec() }; - let value = u64::to_le_bytes(start + self.step); + + let value = start + .checked_add(self.step) + .context(error::SequenceOutOfRangeSnafu { + name: &self.name, + start, + step: self.step, + })?; + let value = u64::to_le_bytes(value); let req = CompareAndPutRequest { key: key.to_vec(), @@ -170,7 +178,25 @@ mod tests { } #[tokio::test] - async fn test_sequence_fouce_quit() { + async fn test_sequence_out_of_rage() { + let kv_store = Arc::new(MemStore::new()); + let initial = u64::MAX - 10; + let seq = Sequence::new("test_seq", initial, 10, kv_store); + + for _ in 0..10 { + let _ = seq.next().await.unwrap(); + } + + let res = seq.next().await; + assert!(res.is_err()); + assert!(matches!( + res.unwrap_err(), + error::Error::SequenceOutOfRange { .. } + )) + } + + #[tokio::test] + async fn test_sequence_force_quit() { struct Noop; #[async_trait::async_trait] diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 489a10b530..d82ba22bef 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -48,7 +48,7 @@ impl heartbeat_server::Heartbeat for MetaSrv { common_runtime::spawn_bg(async move { let mut pusher_key = None; while let Some(msg) = in_stream.next().await { - let mut quit = false; + let mut is_not_leader = false; match msg { Ok(req) => { let header = match req.header.as_ref() { @@ -74,9 +74,7 @@ impl heartbeat_server::Heartbeat for MetaSrv { .await .map_err(|e| e.into()); - if let Ok(res) = &res { - quit = res.is_not_leader(); - } + is_not_leader = res.as_ref().map_or(false, |r| r.is_not_leader()); tx.send(res).await.expect("working rx"); } @@ -96,7 +94,7 @@ impl heartbeat_server::Heartbeat for MetaSrv { } } - if quit { + if is_not_leader { warn!("Quit because it is no longer the leader"); break; }