mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 07:12:54 +00:00
refactor: change mailbox_messages to mailbox_message (#1557)
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3822,7 +3822,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e8abf8241c908448dce595399e89c89a40d048bd#e8abf8241c908448dce595399e89c89a40d048bd"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1552a21e77752f1baf8062b937584b80de84e0b3#1552a21e77752f1baf8062b937584b80de84e0b3"
|
||||
dependencies = [
|
||||
"prost",
|
||||
"tonic 0.9.2",
|
||||
|
||||
@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
|
||||
common-error = { path = "../common/error" }
|
||||
common-time = { path = "../common/time" }
|
||||
datatypes = { path = "../datatypes" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e8abf8241c908448dce595399e89c89a40d048bd" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1552a21e77752f1baf8062b937584b80de84e0b3" }
|
||||
prost.workspace = true
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
tonic.workspace = true
|
||||
|
||||
@@ -73,9 +73,9 @@ pub struct HeartbeatAccumulator {
|
||||
}
|
||||
|
||||
impl HeartbeatAccumulator {
|
||||
pub fn into_mailbox_messages(self) -> Vec<MailboxMessage> {
|
||||
pub fn into_mailbox_message(self) -> Option<MailboxMessage> {
|
||||
// TODO(jiachun): to HeartbeatResponse payload
|
||||
vec![]
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,7 +173,7 @@ impl HeartbeatHandlerGroup {
|
||||
let header = std::mem::take(&mut acc.header);
|
||||
let res = HeartbeatResponse {
|
||||
header,
|
||||
mailbox_messages: acc.into_mailbox_messages(),
|
||||
mailbox_message: acc.into_mailbox_message(),
|
||||
};
|
||||
Ok(res)
|
||||
}
|
||||
@@ -275,7 +275,7 @@ impl Mailbox for HeartbeatMailbox {
|
||||
msg.id = message_id;
|
||||
let res = HeartbeatResponse {
|
||||
header: Some(header),
|
||||
mailbox_messages: vec![msg],
|
||||
mailbox_message: Some(msg),
|
||||
};
|
||||
|
||||
pusher.push(res).await?;
|
||||
@@ -376,8 +376,9 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let recv_obj = pusher_rx.recv().await.unwrap().unwrap();
|
||||
assert_eq!(recv_obj.mailbox_messages[0].timestamp_millis, 123);
|
||||
assert_eq!(recv_obj.mailbox_messages[0].subject, "req-test".to_string());
|
||||
let message = recv_obj.mailbox_message.unwrap();
|
||||
assert_eq!(message.timestamp_millis, 123);
|
||||
assert_eq!(message.subject, "req-test".to_string());
|
||||
|
||||
(mailbox, receiver)
|
||||
}
|
||||
|
||||
@@ -33,13 +33,8 @@ impl HeartbeatHandler for MailboxHandler {
|
||||
ctx: &mut Context,
|
||||
_acc: &mut HeartbeatAccumulator,
|
||||
) -> Result<()> {
|
||||
if req.mailbox_messages.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mailbox_messages = req.mailbox_messages.clone();
|
||||
for msg in mailbox_messages {
|
||||
ctx.mailbox.on_recv(msg.id, Ok(msg)).await?;
|
||||
if let Some(message) = &req.mailbox_message {
|
||||
ctx.mailbox.on_recv(message.id, Ok(message.clone())).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -90,7 +90,7 @@ mod tests {
|
||||
let header = std::mem::take(&mut acc.header);
|
||||
let res = HeartbeatResponse {
|
||||
header,
|
||||
mailbox_messages: acc.into_mailbox_messages(),
|
||||
mailbox_message: acc.into_mailbox_message(),
|
||||
};
|
||||
assert_eq!(1, res.header.unwrap().cluster_id);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user