chore: cherry pick #6821 and bump version to v0.12.2 (#6832)

* fix: correct heartbeat stream handling logic  (#6821)

* fix: correct heartbeat stream handling logic

Signed-off-by: WenyXu <wenymedia@gmail.com>

* Update src/meta-srv/src/service/heartbeat.rs

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: jeremyhi <jiachun_feng@proton.me>
Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: bump version to v0.12.2

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix typos

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
Co-authored-by: jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
Yingwen
2025-08-27 16:56:09 +08:00
committed by GitHub
parent f4c527cddf
commit bcd63fdb87
6 changed files with 100 additions and 87 deletions

142
Cargo.lock generated
View File

@@ -185,7 +185,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"common-base",
"common-decimal",
@@ -710,7 +710,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"async-trait",
@@ -1324,7 +1324,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"catalog",
"common-error",
@@ -1348,7 +1348,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"arrow",
@@ -1661,7 +1661,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"async-trait",
"auth",
@@ -1703,7 +1703,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.1",
"substrait 0.12.2",
"table",
"tempfile",
"tokio",
@@ -1712,7 +1712,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"arc-swap",
@@ -1739,7 +1739,7 @@ dependencies = [
"rand",
"serde_json",
"snafu 0.8.5",
"substrait 0.12.1",
"substrait 0.12.2",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -1780,7 +1780,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"async-trait",
"auth",
@@ -1841,7 +1841,7 @@ dependencies = [
"similar-asserts",
"snafu 0.8.5",
"store-api",
"substrait 0.12.1",
"substrait 0.12.2",
"table",
"temp-env",
"tempfile",
@@ -1887,7 +1887,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"anymap2",
"async-trait",
@@ -1909,11 +1909,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.12.1"
version = "0.12.2"
[[package]]
name = "common-config"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"common-base",
"common-error",
@@ -1938,7 +1938,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"arrow",
"arrow-schema",
@@ -1974,7 +1974,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"bigdecimal 0.4.5",
"common-error",
@@ -1987,7 +1987,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"http 1.1.0",
"snafu 0.8.5",
@@ -1997,7 +1997,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"async-trait",
"common-error",
@@ -2007,7 +2007,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2055,7 +2055,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"async-trait",
"common-runtime",
@@ -2072,7 +2072,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"arrow-flight",
@@ -2100,7 +2100,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"common-base",
@@ -2119,7 +2119,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"arc-swap",
"common-query",
@@ -2133,7 +2133,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"common-error",
"common-macro",
@@ -2146,7 +2146,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"anymap2",
"api",
@@ -2206,7 +2206,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2215,11 +2215,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.12.1"
version = "0.12.2"
[[package]]
name = "common-pprof"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"common-error",
"common-macro",
@@ -2231,7 +2231,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"async-stream",
"async-trait",
@@ -2258,7 +2258,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"async-trait",
"common-procedure",
@@ -2266,7 +2266,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"async-trait",
@@ -2292,7 +2292,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"arc-swap",
"common-error",
@@ -2311,7 +2311,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2341,7 +2341,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"atty",
"backtrace",
@@ -2369,7 +2369,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"client",
"common-query",
@@ -2381,7 +2381,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"arrow",
"chrono",
@@ -2399,7 +2399,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"build-data",
"const_format",
@@ -2409,7 +2409,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"common-base",
"common-error",
@@ -3340,7 +3340,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"arrow-flight",
@@ -3392,7 +3392,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.12.1",
"substrait 0.12.2",
"table",
"tokio",
"toml 0.8.19",
@@ -3401,7 +3401,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"arrow",
"arrow-array",
@@ -4045,7 +4045,7 @@ dependencies = [
[[package]]
name = "file-engine"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"async-trait",
@@ -4155,7 +4155,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"arrow",
@@ -4216,7 +4216,7 @@ dependencies = [
"snafu 0.8.5",
"store-api",
"strum 0.25.0",
"substrait 0.12.1",
"substrait 0.12.2",
"table",
"tokio",
"tonic 0.12.3",
@@ -4271,7 +4271,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"arc-swap",
@@ -5539,7 +5539,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6331,7 +6331,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"chrono",
"common-error",
@@ -6343,7 +6343,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"async-stream",
"async-trait",
@@ -6636,7 +6636,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"async-trait",
@@ -6663,7 +6663,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"async-trait",
@@ -6749,7 +6749,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"aquamarine",
@@ -6847,7 +6847,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"aquamarine",
@@ -7544,7 +7544,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"anyhow",
"bytes",
@@ -7793,7 +7793,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -7841,7 +7841,7 @@ dependencies = [
"sql",
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
"store-api",
"substrait 0.12.1",
"substrait 0.12.2",
"table",
"tokio",
"tokio-util",
@@ -8078,7 +8078,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"async-trait",
@@ -8346,7 +8346,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8486,7 +8486,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"auth",
"clap 4.5.19",
@@ -8748,7 +8748,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -8993,7 +8993,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9034,7 +9034,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9099,7 +9099,7 @@ dependencies = [
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
"statrs",
"store-api",
"substrait 0.12.1",
"substrait 0.12.2",
"table",
"tokio",
"tokio-stream",
@@ -10444,7 +10444,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"ahash 0.8.11",
"api",
@@ -10561,7 +10561,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"arc-swap",
@@ -10870,7 +10870,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"chrono",
@@ -10924,7 +10924,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11241,7 +11241,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"aquamarine",
@@ -11371,7 +11371,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"async-trait",
"bytes",
@@ -11552,7 +11552,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"async-trait",
@@ -11803,7 +11803,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"arbitrary",
"async-trait",
@@ -11847,7 +11847,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.12.1"
version = "0.12.2"
dependencies = [
"api",
"arrow-flight",
@@ -11913,7 +11913,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.12.1",
"substrait 0.12.2",
"table",
"tempfile",
"time",

View File

@@ -67,7 +67,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.12.1"
version = "0.12.2"
edition = "2021"
license = "Apache-2.0"

View File

@@ -133,7 +133,7 @@ impl Flownode for FlowWorkerManager {
.map_err(to_meta_err(snafu::location!()))?;
debug!(
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sended and {} output rows flushed",
"Done to flush flow_id={:?} with {} input rows flushed, {} rows sent and {} output rows flushed",
flow_id, flushed_input_rows, rows_send, row
);
Ok(FlowResponse {

View File

@@ -214,6 +214,7 @@ impl HeartbeatTask {
if let Some(message) = message {
Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
} else {
warn!("Sender has been dropped, exiting the heartbeat loop");
// Receives None that means Sender was dropped, we need to break the current loop
break
}
@@ -255,7 +256,11 @@ impl HeartbeatTask {
error!(e; "Error while handling heartbeat response");
}
}
Ok(None) => break,
Ok(None) => {
warn!("Heartbeat response stream closed");
capture_self.start_with_retry(retry_interval).await;
break;
}
Err(e) => {
error!(e; "Occur error while reading heartbeat response");
capture_self.start_with_retry(retry_interval).await;

View File

@@ -23,7 +23,7 @@ use common_meta::heartbeat::handler::{
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info};
use common_telemetry::{debug, error, info, warn};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
@@ -42,8 +42,8 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
pub struct HeartbeatTask {
peer_addr: String,
meta_client: Arc<MetaClient>,
report_interval: u64,
retry_interval: u64,
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
start_time_ms: u64,
}
@@ -58,8 +58,8 @@ impl HeartbeatTask {
HeartbeatTask {
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
meta_client,
report_interval: heartbeat_opts.interval.as_millis() as u64,
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
}
@@ -103,13 +103,15 @@ impl HeartbeatTask {
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
}
}
Ok(None) => break,
Ok(None) => {
warn!("Heartbeat response stream closed");
capture_self.start_with_retry(retry_interval).await;
break;
}
Err(e) => {
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
error!(e; "Occur error while reading heartbeat response");
capture_self
.start_with_retry(Duration::from_millis(retry_interval))
.await;
capture_self.start_with_retry(retry_interval).await;
break;
}
@@ -177,12 +179,13 @@ impl HeartbeatTask {
if let Some(message) = message {
Self::new_heartbeat_request(&heartbeat_request, Some(message))
} else {
warn!("Sender has been dropped, exiting the heartbeat loop");
// Receives None that means Sender was dropped, we need to break the current loop
break
}
}
_ = &mut sleep => {
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
sleep.as_mut().reset(Instant::now() + report_interval);
Self::new_heartbeat_request(&heartbeat_request, None)
}
};

View File

@@ -27,10 +27,9 @@ use snafu::OptionExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Streaming};
use tonic::{Request, Response, Status, Streaming};
use crate::error;
use crate::error::Result;
use crate::error::{self, Result};
use crate::handler::{HeartbeatHandlerGroup, Pusher, PusherId};
use crate::metasrv::{Context, Metasrv};
use crate::metrics::METRIC_META_HEARTBEAT_RECV;
@@ -109,6 +108,12 @@ impl heartbeat_server::Heartbeat for Metasrv {
if is_not_leader {
warn!("Quit because it is no longer the leader");
let _ = tx
.send(Err(Status::aborted(format!(
"The requested metasrv node is not leader, node addr: {}",
ctx.server_addr
))))
.await;
break;
}
}