From bcd63fdb87382f0d80cbd847579201096eae1858 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 27 Aug 2025 16:56:09 +0800 Subject: [PATCH] chore: cherry pick #6821 and bump version to v0.12.2 (#6832) * fix: correct heartbeat stream handling logic (#6821) * fix: correct heartbeat stream handling logic Signed-off-by: WenyXu * Update src/meta-srv/src/service/heartbeat.rs Co-authored-by: jeremyhi --------- Signed-off-by: WenyXu Co-authored-by: jeremyhi Signed-off-by: evenyag * chore: bump version to v0.12.2 Signed-off-by: evenyag * chore: fix typos Signed-off-by: evenyag --------- Signed-off-by: WenyXu Signed-off-by: evenyag Co-authored-by: Weny Xu Co-authored-by: jeremyhi --- Cargo.lock | 142 +++++++++++++------------- Cargo.toml | 2 +- src/flow/src/adapter/flownode_impl.rs | 2 +- src/flow/src/heartbeat.rs | 7 +- src/frontend/src/heartbeat.rs | 23 +++-- src/meta-srv/src/service/heartbeat.rs | 11 +- 6 files changed, 100 insertions(+), 87 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 16e10489d5..047312b125 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,7 +185,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" [[package]] name = "api" -version = "0.12.1" +version = "0.12.2" dependencies = [ "common-base", "common-decimal", @@ -710,7 +710,7 @@ dependencies = [ [[package]] name = "auth" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "async-trait", @@ -1324,7 +1324,7 @@ dependencies = [ [[package]] name = "cache" -version = "0.12.1" +version = "0.12.2" dependencies = [ "catalog", "common-error", @@ -1348,7 +1348,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "catalog" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "arrow", @@ -1661,7 +1661,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" [[package]] name = "cli" -version = "0.12.1" +version = "0.12.2" dependencies = [ "async-trait", "auth", @@ -1703,7 +1703,7 @@ dependencies = [ "session", "snafu 0.8.5", "store-api", - "substrait 0.12.1", + "substrait 0.12.2", "table", "tempfile", "tokio", @@ -1712,7 +1712,7 @@ dependencies = [ [[package]] name = "client" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "arc-swap", @@ -1739,7 +1739,7 @@ dependencies = [ "rand", "serde_json", "snafu 0.8.5", - "substrait 0.12.1", + "substrait 0.12.2", "substrait 0.37.3", "tokio", "tokio-stream", @@ -1780,7 +1780,7 @@ dependencies = [ [[package]] name = "cmd" -version = "0.12.1" +version = "0.12.2" dependencies = [ "async-trait", "auth", @@ -1841,7 +1841,7 @@ dependencies = [ "similar-asserts", "snafu 0.8.5", "store-api", - "substrait 0.12.1", + "substrait 0.12.2", "table", "temp-env", "tempfile", @@ -1887,7 +1887,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335" [[package]] name = "common-base" -version = "0.12.1" +version = "0.12.2" dependencies = [ "anymap2", "async-trait", @@ -1909,11 +1909,11 @@ dependencies = [ [[package]] name = "common-catalog" -version = "0.12.1" +version = "0.12.2" [[package]] name = "common-config" -version = "0.12.1" +version = "0.12.2" dependencies = [ "common-base", "common-error", @@ -1938,7 +1938,7 @@ dependencies = [ [[package]] name = "common-datasource" -version = "0.12.1" +version = "0.12.2" dependencies = [ "arrow", "arrow-schema", @@ -1974,7 +1974,7 @@ dependencies = [ [[package]] name = "common-decimal" -version = "0.12.1" +version = "0.12.2" dependencies = [ "bigdecimal 0.4.5", "common-error", @@ -1987,7 +1987,7 @@ dependencies = [ [[package]] name = "common-error" -version = "0.12.1" +version = "0.12.2" dependencies = [ "http 1.1.0", "snafu 0.8.5", @@ -1997,7 +1997,7 @@ dependencies = [ [[package]] name = "common-frontend" -version = "0.12.1" +version = "0.12.2" dependencies = [ "async-trait", "common-error", @@ -2007,7 +2007,7 @@ dependencies = [ [[package]] name = "common-function" -version = "0.12.1" +version = "0.12.2" dependencies = [ "ahash 0.8.11", "api", @@ -2055,7 +2055,7 @@ dependencies = [ [[package]] name = "common-greptimedb-telemetry" -version = "0.12.1" +version = "0.12.2" dependencies = [ "async-trait", "common-runtime", @@ -2072,7 +2072,7 @@ dependencies = [ [[package]] name = "common-grpc" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "arrow-flight", @@ -2100,7 +2100,7 @@ dependencies = [ [[package]] name = "common-grpc-expr" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "common-base", @@ -2119,7 +2119,7 @@ dependencies = [ [[package]] name = "common-macro" -version = "0.12.1" +version = "0.12.2" dependencies = [ "arc-swap", "common-query", @@ -2133,7 +2133,7 @@ dependencies = [ [[package]] name = "common-mem-prof" -version = "0.12.1" +version = "0.12.2" dependencies = [ "common-error", "common-macro", @@ -2146,7 +2146,7 @@ dependencies = [ [[package]] name = "common-meta" -version = "0.12.1" +version = "0.12.2" dependencies = [ "anymap2", "api", @@ -2206,7 +2206,7 @@ dependencies = [ [[package]] name = "common-options" -version = "0.12.1" +version = "0.12.2" dependencies = [ "common-grpc", "humantime-serde", @@ -2215,11 +2215,11 @@ dependencies = [ [[package]] name = "common-plugins" -version = "0.12.1" +version = "0.12.2" [[package]] name = "common-pprof" -version = "0.12.1" +version = "0.12.2" dependencies = [ "common-error", "common-macro", @@ -2231,7 +2231,7 @@ dependencies = [ [[package]] name = "common-procedure" -version = "0.12.1" +version = "0.12.2" dependencies = [ "async-stream", "async-trait", @@ -2258,7 +2258,7 @@ dependencies = [ [[package]] name = "common-procedure-test" -version = "0.12.1" +version = "0.12.2" dependencies = [ "async-trait", "common-procedure", @@ -2266,7 +2266,7 @@ dependencies = [ [[package]] name = "common-query" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "async-trait", @@ -2292,7 +2292,7 @@ dependencies = [ [[package]] name = "common-recordbatch" -version = "0.12.1" +version = "0.12.2" dependencies = [ "arc-swap", "common-error", @@ -2311,7 +2311,7 @@ dependencies = [ [[package]] name = "common-runtime" -version = "0.12.1" +version = "0.12.2" dependencies = [ "async-trait", "clap 4.5.19", @@ -2341,7 +2341,7 @@ dependencies = [ [[package]] name = "common-telemetry" -version = "0.12.1" +version = "0.12.2" dependencies = [ "atty", "backtrace", @@ -2369,7 +2369,7 @@ dependencies = [ [[package]] name = "common-test-util" -version = "0.12.1" +version = "0.12.2" dependencies = [ "client", "common-query", @@ -2381,7 +2381,7 @@ dependencies = [ [[package]] name = "common-time" -version = "0.12.1" +version = "0.12.2" dependencies = [ "arrow", "chrono", @@ -2399,7 +2399,7 @@ dependencies = [ [[package]] name = "common-version" -version = "0.12.1" +version = "0.12.2" dependencies = [ "build-data", "const_format", @@ -2409,7 +2409,7 @@ dependencies = [ [[package]] name = "common-wal" -version = "0.12.1" +version = "0.12.2" dependencies = [ "common-base", "common-error", @@ -3340,7 +3340,7 @@ dependencies = [ [[package]] name = "datanode" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "arrow-flight", @@ -3392,7 +3392,7 @@ dependencies = [ "session", "snafu 0.8.5", "store-api", - "substrait 0.12.1", + "substrait 0.12.2", "table", "tokio", "toml 0.8.19", @@ -3401,7 +3401,7 @@ dependencies = [ [[package]] name = "datatypes" -version = "0.12.1" +version = "0.12.2" dependencies = [ "arrow", "arrow-array", @@ -4045,7 +4045,7 @@ dependencies = [ [[package]] name = "file-engine" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "async-trait", @@ -4155,7 +4155,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8" [[package]] name = "flow" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "arrow", @@ -4216,7 +4216,7 @@ dependencies = [ "snafu 0.8.5", "store-api", "strum 0.25.0", - "substrait 0.12.1", + "substrait 0.12.2", "table", "tokio", "tonic 0.12.3", @@ -4271,7 +4271,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" [[package]] name = "frontend" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "arc-swap", @@ -5539,7 +5539,7 @@ dependencies = [ [[package]] name = "index" -version = "0.12.1" +version = "0.12.2" dependencies = [ "async-trait", "asynchronous-codec", @@ -6331,7 +6331,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "log-query" -version = "0.12.1" +version = "0.12.2" dependencies = [ "chrono", "common-error", @@ -6343,7 +6343,7 @@ dependencies = [ [[package]] name = "log-store" -version = "0.12.1" +version = "0.12.2" dependencies = [ "async-stream", "async-trait", @@ -6636,7 +6636,7 @@ dependencies = [ [[package]] name = "meta-client" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "async-trait", @@ -6663,7 +6663,7 @@ dependencies = [ [[package]] name = "meta-srv" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "async-trait", @@ -6749,7 +6749,7 @@ dependencies = [ [[package]] name = "metric-engine" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "aquamarine", @@ -6847,7 +6847,7 @@ dependencies = [ [[package]] name = "mito2" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "aquamarine", @@ -7544,7 +7544,7 @@ dependencies = [ [[package]] name = "object-store" -version = "0.12.1" +version = "0.12.2" dependencies = [ "anyhow", "bytes", @@ -7793,7 +7793,7 @@ dependencies = [ [[package]] name = "operator" -version = "0.12.1" +version = "0.12.2" dependencies = [ "ahash 0.8.11", "api", @@ -7841,7 +7841,7 @@ dependencies = [ "sql", "sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)", "store-api", - "substrait 0.12.1", + "substrait 0.12.2", "table", "tokio", "tokio-util", @@ -8078,7 +8078,7 @@ dependencies = [ [[package]] name = "partition" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "async-trait", @@ -8346,7 +8346,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pipeline" -version = "0.12.1" +version = "0.12.2" dependencies = [ "ahash 0.8.11", "api", @@ -8486,7 +8486,7 @@ dependencies = [ [[package]] name = "plugins" -version = "0.12.1" +version = "0.12.2" dependencies = [ "auth", "clap 4.5.19", @@ -8748,7 +8748,7 @@ dependencies = [ [[package]] name = "promql" -version = "0.12.1" +version = "0.12.2" dependencies = [ "ahash 0.8.11", "async-trait", @@ -8993,7 +8993,7 @@ dependencies = [ [[package]] name = "puffin" -version = "0.12.1" +version = "0.12.2" dependencies = [ "async-compression 0.4.13", "async-trait", @@ -9034,7 +9034,7 @@ dependencies = [ [[package]] name = "query" -version = "0.12.1" +version = "0.12.2" dependencies = [ "ahash 0.8.11", "api", @@ -9099,7 +9099,7 @@ dependencies = [ "sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)", "statrs", "store-api", - "substrait 0.12.1", + "substrait 0.12.2", "table", "tokio", "tokio-stream", @@ -10444,7 +10444,7 @@ dependencies = [ [[package]] name = "servers" -version = "0.12.1" +version = "0.12.2" dependencies = [ "ahash 0.8.11", "api", @@ -10561,7 +10561,7 @@ dependencies = [ [[package]] name = "session" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "arc-swap", @@ -10870,7 +10870,7 @@ dependencies = [ [[package]] name = "sql" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "chrono", @@ -10924,7 +10924,7 @@ dependencies = [ [[package]] name = "sqlness-runner" -version = "0.12.1" +version = "0.12.2" dependencies = [ "async-trait", "clap 4.5.19", @@ -11241,7 +11241,7 @@ dependencies = [ [[package]] name = "store-api" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "aquamarine", @@ -11371,7 +11371,7 @@ dependencies = [ [[package]] name = "substrait" -version = "0.12.1" +version = "0.12.2" dependencies = [ "async-trait", "bytes", @@ -11552,7 +11552,7 @@ dependencies = [ [[package]] name = "table" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "async-trait", @@ -11803,7 +11803,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "tests-fuzz" -version = "0.12.1" +version = "0.12.2" dependencies = [ "arbitrary", "async-trait", @@ -11847,7 +11847,7 @@ dependencies = [ [[package]] name = "tests-integration" -version = "0.12.1" +version = "0.12.2" dependencies = [ "api", "arrow-flight", @@ -11913,7 +11913,7 @@ dependencies = [ "sql", "sqlx", "store-api", - "substrait 0.12.1", + "substrait 0.12.2", "table", "tempfile", "time", diff --git a/Cargo.toml b/Cargo.toml index ed61854f05..2ba43328af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,7 +67,7 @@ members = [ resolver = "2" [workspace.package] -version = "0.12.1" +version = "0.12.2" edition = "2021" license = "Apache-2.0" diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 7f4a4267cb..1e323369f3 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -133,7 +133,7 @@ impl Flownode for FlowWorkerManager { .map_err(to_meta_err(snafu::location!()))?; debug!( - "Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed", + "Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed", flow_id, flushed_input_rows, rows_send, row ); Ok(FlowResponse { diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index 45786a4d80..8dd67027f5 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -214,6 +214,7 @@ impl HeartbeatTask { if let Some(message) = message { Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report) } else { + warn!("Sender has been dropped, exiting the heartbeat loop"); // Receives None that means Sender was dropped, we need to break the current loop break } @@ -255,7 +256,11 @@ impl HeartbeatTask { error!(e; "Error while handling heartbeat response"); } } - Ok(None) => break, + Ok(None) => { + warn!("Heartbeat response stream closed"); + capture_self.start_with_retry(retry_interval).await; + break; + } Err(e) => { error!(e; "Occur error while reading heartbeat response"); capture_self.start_with_retry(retry_interval).await; diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 47dac786b2..d60eb79125 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{ }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, error, info, warn}; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; use servers::addrs; use servers::heartbeat_options::HeartbeatOptions; @@ -42,8 +42,8 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT}; pub struct HeartbeatTask { peer_addr: String, meta_client: Arc, - report_interval: u64, - retry_interval: u64, + report_interval: Duration, + retry_interval: Duration, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, start_time_ms: u64, } @@ -58,8 +58,8 @@ impl HeartbeatTask { HeartbeatTask { peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)), meta_client, - report_interval: heartbeat_opts.interval.as_millis() as u64, - retry_interval: heartbeat_opts.retry_interval.as_millis() as u64, + report_interval: heartbeat_opts.interval, + retry_interval: heartbeat_opts.retry_interval, resp_handler_executor, start_time_ms: common_time::util::current_time_millis() as u64, } @@ -103,13 +103,15 @@ impl HeartbeatTask { HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc(); } } - Ok(None) => break, + Ok(None) => { + warn!("Heartbeat response stream closed"); + capture_self.start_with_retry(retry_interval).await; + break; + } Err(e) => { HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc(); error!(e; "Occur error while reading heartbeat response"); - capture_self - .start_with_retry(Duration::from_millis(retry_interval)) - .await; + capture_self.start_with_retry(retry_interval).await; break; } @@ -177,12 +179,13 @@ impl HeartbeatTask { if let Some(message) = message { Self::new_heartbeat_request(&heartbeat_request, Some(message)) } else { + warn!("Sender has been dropped, exiting the heartbeat loop"); // Receives None that means Sender was dropped, we need to break the current loop break } } _ = &mut sleep => { - sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval)); + sleep.as_mut().reset(Instant::now() + report_interval); Self::new_heartbeat_request(&heartbeat_request, None) } }; diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 45adb5f57e..a24fb4967f 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -27,10 +27,9 @@ use snafu::OptionExt; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Streaming}; +use tonic::{Request, Response, Status, Streaming}; -use crate::error; -use crate::error::Result; +use crate::error::{self, Result}; use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId}; use crate::metasrv::{Context, Metasrv}; use crate::metrics::METRIC_META_HEARTBEAT_RECV; @@ -109,6 +108,12 @@ impl heartbeat_server::Heartbeat for Metasrv { if is_not_leader { warn!("Quit because it is no longer the leader"); + let _ = tx + .send(Err(Status::aborted(format!( + "The requested metasrv node is not leader, node addr: {}", + ctx.server_addr + )))) + .await; break; } }