fix: sequence out of range (#1597)

This commit is contained in:
JeremyHi
2023-05-17 14:43:54 +08:00
committed by GitHub
parent 4920836021
commit eb95a9e78b
6 changed files with 59 additions and 20 deletions

View File

@@ -154,6 +154,14 @@ pub enum Error {
#[snafu(display("Failed to get sequence: {}", err_msg))]
NextSequence { err_msg: String, location: Location },
#[snafu(display("Sequence out of range: {}, start={}, step={}", name, start, step))]
SequenceOutOfRange {
name: String,
start: u64,
step: u64,
location: Location,
},
#[snafu(display("MetaSrv has no leader at this moment"))]
NoLeader { location: Location },
@@ -379,6 +387,7 @@ impl ErrorExt for Error {
| Error::UnexceptedSequenceValue { .. }
| Error::TableRouteNotFound { .. }
| Error::NextSequence { .. }
| Error::SequenceOutOfRange { .. }
| Error::MoveValue { .. }
| Error::InvalidKvsLength { .. }
| Error::InvalidTxnResult { .. }

View File

@@ -241,6 +241,18 @@ impl HeartbeatMailbox {
}
}
}
#[inline]
async fn next_message_id(&self) -> Result<u64> {
// In this implementation, we pre-occupy the message_id of 0,
// and we use `message_id = 0` to mark a Message as a one-way call.
loop {
let next = self.sequence.next().await?;
if next > 0 {
return Ok(next);
}
}
}
}
#[async_trait::async_trait]
@@ -251,7 +263,7 @@ impl Mailbox for HeartbeatMailbox {
mut msg: MailboxMessage,
timeout: Duration,
) -> Result<MailboxReceiver> {
let message_id = self.sequence.next().await?;
let message_id = self.next_message_id().await?;
let pusher_id = match ch {
Channel::Datanode(id) => format!("{}-{}", Role::Datanode as i32, id),
@@ -288,15 +300,7 @@ impl Mailbox for HeartbeatMailbox {
tx.send(maybe_msg)
.map_err(|_| error::MailboxClosedSnafu { id }.build())?;
} else if let Ok(finally_msg) = maybe_msg {
let MailboxMessage {
id,
subject,
from,
to,
timestamp_millis,
..
} = finally_msg;
warn!("The response arrived too late, id={id}, subject={subject}, from={from}, to={to}, timestamp={timestamp_millis}");
warn!("The response arrived too late: {finally_msg:?}");
}
Ok(())

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use api::v1::meta::{Error, HeartbeatRequest, Role};
use common_telemetry::warn;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
@@ -29,7 +30,7 @@ impl HeartbeatHandler for CheckLeaderHandler {
async fn handle(
&self,
_req: &HeartbeatRequest,
req: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
@@ -40,6 +41,7 @@ impl HeartbeatHandler for CheckLeaderHandler {
if let Some(header) = &mut acc.header {
header.error = Some(Error::is_not_leader());
ctx.set_skip_all();
warn!("Received a heartbeat {:?}, but the current node is not the leader, so the heartbeat will be ignored.", req.header);
}
}
Ok(())

View File

@@ -161,7 +161,7 @@ impl MetaSrvBuilder {
let metadata_service = metadata_service
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone())));
let mailbox_sequence = Sequence::new("heartbeat_mailbox", 0, 100, kv_store.clone());
let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_store.clone());
let mailbox = HeartbeatMailbox::create(handler_group.pushers(), mailbox_sequence);
MetaSrv {

View File

@@ -16,7 +16,7 @@ use std::ops::Range;
use std::sync::Arc;
use api::v1::meta::CompareAndPutRequest;
use snafu::ensure;
use snafu::{ensure, OptionExt};
use tokio::sync::Mutex;
use crate::error::{self, Result};
@@ -106,7 +106,15 @@ impl Inner {
} else {
u64::to_le_bytes(start).to_vec()
};
let value = u64::to_le_bytes(start + self.step);
let value = start
.checked_add(self.step)
.context(error::SequenceOutOfRangeSnafu {
name: &self.name,
start,
step: self.step,
})?;
let value = u64::to_le_bytes(value);
let req = CompareAndPutRequest {
key: key.to_vec(),
@@ -170,7 +178,25 @@ mod tests {
}
#[tokio::test]
async fn test_sequence_fouce_quit() {
async fn test_sequence_out_of_rage() {
let kv_store = Arc::new(MemStore::new());
let initial = u64::MAX - 10;
let seq = Sequence::new("test_seq", initial, 10, kv_store);
for _ in 0..10 {
let _ = seq.next().await.unwrap();
}
let res = seq.next().await;
assert!(res.is_err());
assert!(matches!(
res.unwrap_err(),
error::Error::SequenceOutOfRange { .. }
))
}
#[tokio::test]
async fn test_sequence_force_quit() {
struct Noop;
#[async_trait::async_trait]

View File

@@ -48,7 +48,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {
common_runtime::spawn_bg(async move {
let mut pusher_key = None;
while let Some(msg) = in_stream.next().await {
let mut quit = false;
let mut is_not_leader = false;
match msg {
Ok(req) => {
let header = match req.header.as_ref() {
@@ -74,9 +74,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {
.await
.map_err(|e| e.into());
if let Ok(res) = &res {
quit = res.is_not_leader();
}
is_not_leader = res.as_ref().map_or(false, |r| r.is_not_leader());
tx.send(res).await.expect("working rx");
}
@@ -96,7 +94,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {
}
}
if quit {
if is_not_leader {
warn!("Quit because it is no longer the leader");
break;
}