diff --git a/Cargo.lock b/Cargo.lock index d42e699cbb..6fcfb0d990 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3822,7 +3822,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e8abf8241c908448dce595399e89c89a40d048bd#e8abf8241c908448dce595399e89c89a40d048bd" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1552a21e77752f1baf8062b937584b80de84e0b3#1552a21e77752f1baf8062b937584b80de84e0b3" dependencies = [ "prost", "tonic 0.9.2", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index b4b0523959..d8e718b73e 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -10,7 +10,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e8abf8241c908448dce595399e89c89a40d048bd" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1552a21e77752f1baf8062b937584b80de84e0b3" } prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tonic.workspace = true diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 98fbca803d..a0b0c38e52 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -73,9 +73,9 @@ pub struct HeartbeatAccumulator { } impl HeartbeatAccumulator { - pub fn into_mailbox_messages(self) -> Vec { + pub fn into_mailbox_message(self) -> Option { // TODO(jiachun): to HeartbeatResponse payload - vec![] + None } } @@ -173,7 +173,7 @@ impl HeartbeatHandlerGroup { let header = std::mem::take(&mut acc.header); let res = HeartbeatResponse { header, - mailbox_messages: acc.into_mailbox_messages(), + mailbox_message: acc.into_mailbox_message(), }; Ok(res) } @@ -275,7 +275,7 @@ impl Mailbox for HeartbeatMailbox { msg.id = message_id; let res = HeartbeatResponse { header: Some(header), - mailbox_messages: vec![msg], + mailbox_message: Some(msg), }; pusher.push(res).await?; @@ -376,8 +376,9 @@ mod tests { .unwrap(); let recv_obj = pusher_rx.recv().await.unwrap().unwrap(); - assert_eq!(recv_obj.mailbox_messages[0].timestamp_millis, 123); - assert_eq!(recv_obj.mailbox_messages[0].subject, "req-test".to_string()); + let message = recv_obj.mailbox_message.unwrap(); + assert_eq!(message.timestamp_millis, 123); + assert_eq!(message.subject, "req-test".to_string()); (mailbox, receiver) } diff --git a/src/meta-srv/src/handler/mailbox_handler.rs b/src/meta-srv/src/handler/mailbox_handler.rs index 6bcc0ff4c8..973e869411 100644 --- a/src/meta-srv/src/handler/mailbox_handler.rs +++ b/src/meta-srv/src/handler/mailbox_handler.rs @@ -33,13 +33,8 @@ impl HeartbeatHandler for MailboxHandler { ctx: &mut Context, _acc: &mut HeartbeatAccumulator, ) -> Result<()> { - if req.mailbox_messages.is_empty() { - return Ok(()); - } - - let mailbox_messages = req.mailbox_messages.clone(); - for msg in mailbox_messages { - ctx.mailbox.on_recv(msg.id, Ok(msg)).await?; + if let Some(message) = &req.mailbox_message { + ctx.mailbox.on_recv(message.id, Ok(message.clone())).await?; } Ok(()) diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 733c086129..28fffa3c29 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -90,7 +90,7 @@ mod tests { let header = std::mem::take(&mut acc.header); let res = HeartbeatResponse { header, - mailbox_messages: acc.into_mailbox_messages(), + mailbox_message: acc.into_mailbox_message(), }; assert_eq!(1, res.header.unwrap().cluster_id); }