feat: support env vars in heartbeat (#8064)

* feat: support reporting env vars in heartbeat messages to metasrv

Add `heartbeat_env_vars` config option for datanode and frontend. When
configured, the specified environment variable values are read at startup
and sent to metasrv in every heartbeat via the `extensions` map. Metasrv
extracts and stores them in `NodeInfo` for use in routing decisions
(e.g. AZ-aware region placement).

- Add `EnvVars` helper in `common/meta/src/datanode.rs` following the
  existing `GcStat` extension pattern with `into_extensions`/`from_extensions`
- Add `env_vars: HashMap<String, String>` field to `NodeInfo` in
  `common/meta/src/cluster.rs` with `#[serde(default)]` for backward compat
- Add `heartbeat_env_vars: Vec<String>` config field to `DatanodeOptions`,
  `FrontendOptions`, and `StandaloneOptions`
- Inject env vars into heartbeat `extensions` in both datanode and frontend
  heartbeat tasks (`datanode/src/heartbeat.rs`, `frontend/src/heartbeat.rs`)
- Extract env vars from `req.extensions` in all three metasrv
  `CollectXxxClusterInfoHandler`s
- Update `NodeInfo` construction sites in `meta-client`,
  `discovery/lease.rs`, and `standalone/information_extension.rs`
- Update expected TOML output in `tests-integration/tests/http.rs`
- Add unit tests for `EnvVars` round-trip and `NodeInfo` backward compat

Signed-off-by: Lei, HUANG <leih@nvidia.com>
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor: address heartbeat env review feedback

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore: log error on deserialization failure

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor: send heartbeat env vars once

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: resend heartbeat env vars after reconnect

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* revert: keep env vars in every heartbeat

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <leih@nvidia.com>
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-05-08 15:37:53 +08:00
committed by GitHub
parent d1873ca31d
commit 42aa58aa27
14 changed files with 227 additions and 13 deletions

View File

@@ -16,7 +16,7 @@ use std::time::Duration;
use cmd::options::GreptimeOptions;
use common_base::memory_limit::MemoryLimit;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_config::{Configurable, DEFAULT_DATA_HOME, ENV_VAR_SEP};
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT, LoggingOptions};
use common_wal::config::DatanodeWalConfig;
@@ -311,3 +311,25 @@ fn test_load_standalone_example_config() {
};
similar_asserts::assert_eq!(options, expected);
}
#[test]
fn test_load_heartbeat_env_vars_from_env() {
let env_prefix = "HEARTBEAT_ENV_VARS_UT";
let env_key = [env_prefix, "HEARTBEAT_ENV_VARS"].join(ENV_VAR_SEP);
temp_env::with_var(env_key, Some("AZ,REGION"), || {
let expected = vec!["AZ".to_string(), "REGION".to_string()];
let datanode =
GreptimeOptions::<DatanodeOptions>::load_layered_options(None, env_prefix).unwrap();
similar_asserts::assert_eq!(datanode.component.heartbeat_env_vars, expected);
let frontend =
GreptimeOptions::<FrontendOptions>::load_layered_options(None, env_prefix).unwrap();
similar_asserts::assert_eq!(frontend.component.heartbeat_env_vars, expected);
let standalone =
GreptimeOptions::<StandaloneOptions>::load_layered_options(None, env_prefix).unwrap();
similar_asserts::assert_eq!(standalone.component.heartbeat_env_vars, expected);
});
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::str::FromStr;
@@ -134,6 +135,9 @@ pub struct NodeInfo {
// The node build hostname
#[serde(default)]
pub hostname: String,
/// Environment variables reported by the node.
#[serde(default)]
pub env_vars: HashMap<String, String>,
}
#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
@@ -355,6 +359,7 @@ mod tests {
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
env_vars: Default::default(),
};
let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
@@ -451,6 +456,7 @@ mod tests {
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
env_vars: Default::default(),
};
let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
@@ -464,4 +470,59 @@ mod tests {
} if workloads.types == vec![7]
);
}
#[test]
fn test_node_info_backward_compatible_without_env_vars() {
// Simulate a NodeInfo serialized before env_vars was added
let raw = r#"{
"peer":{"id":1,"addr":"127.0.0.1"},
"last_activity_ts":123,
"status":{"Datanode":{"rcus":0,"wcus":0,"leader_regions":0,"follower_regions":0,"workloads":{"types":[0]}}},
"version":"",
"git_commit":"",
"start_time_ms":1,
"total_cpu_millicores":0,
"total_memory_bytes":0,
"cpu_usage_millicores":0,
"memory_usage_bytes":0,
"hostname":"test"
}"#;
let node_info: NodeInfo = raw.parse().unwrap();
assert!(node_info.env_vars.is_empty());
}
#[test]
fn test_node_info_with_env_vars_round_trip() {
let mut env_vars = HashMap::new();
env_vars.insert("AZ".to_string(), "us-east-1a".to_string());
let node_info = NodeInfo {
peer: Peer {
id: 1,
addr: "127.0.0.1".to_string(),
},
last_activity_ts: 123,
status: NodeStatus::Datanode(DatanodeStatus {
rcus: 0,
wcus: 0,
leader_regions: 0,
follower_regions: 0,
workloads: DatanodeWorkloads { types: vec![] },
}),
version: "".to_string(),
git_commit: "".to_string(),
start_time_ms: 1,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test".to_string(),
env_vars,
};
let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
assert_eq!(new_node_info.env_vars.get("AZ").unwrap(), "us-east-1a");
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
@@ -371,6 +371,48 @@ impl GcStat {
}
}
/// Environment variables reported by a node in heartbeat messages.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct EnvVars {
pub vars: HashMap<String, String>,
}
impl EnvVars {
pub const ENV_VARS_KEY: &str = "__env_vars";
pub fn new(vars: HashMap<String, String>) -> Self {
Self { vars }
}
/// Read the configured env var keys from the environment and build an EnvVars.
pub fn from_config(keys: &[String]) -> Self {
let vars = keys
.iter()
.filter_map(|key| std::env::var(key).ok().map(|value| (key.clone(), value)))
.collect();
Self { vars }
}
pub fn into_extensions(&self, extensions: &mut HashMap<String, Vec<u8>>) {
if self.vars.is_empty() {
return;
}
let bytes = serde_json::to_vec(self).unwrap_or_default();
extensions.insert(Self::ENV_VARS_KEY.to_string(), bytes);
}
pub fn from_extensions(extensions: &HashMap<String, Vec<u8>>) -> Result<Option<Self>> {
extensions
.get(Self::ENV_VARS_KEY)
.map(|bytes| {
serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu {
input: String::from_utf8_lossy(bytes).to_string(),
})
})
.transpose()
}
}
/// The key of the datanode stat in the memory store.
///
/// The format is `__meta_datanode_stat-0-{node_id}`.
@@ -596,4 +638,34 @@ mod tests {
assert_eq!(stat.region_stats.len(), 1);
assert_eq!(stat.region_stats[0].role, RegionRole::StagingLeader);
}
#[test]
fn test_env_vars_round_trip() {
let mut vars = HashMap::new();
vars.insert("AZ".to_string(), "us-east-1a".to_string());
vars.insert("REGION".to_string(), "us-east-1".to_string());
let env_vars = EnvVars::new(vars);
let mut extensions = HashMap::new();
env_vars.into_extensions(&mut extensions);
let extracted = EnvVars::from_extensions(&extensions).unwrap().unwrap();
assert_eq!(extracted.vars.get("AZ").unwrap(), "us-east-1a");
assert_eq!(extracted.vars.get("REGION").unwrap(), "us-east-1");
}
#[test]
fn test_env_vars_empty_not_written() {
let env_vars = EnvVars::default();
let mut extensions = HashMap::new();
env_vars.into_extensions(&mut extensions);
assert!(extensions.is_empty());
}
#[test]
fn test_env_vars_from_extensions_missing() {
let extensions = HashMap::new();
let result = EnvVars::from_extensions(&extensions).unwrap();
assert!(result.is_none());
}
}

View File

@@ -83,6 +83,10 @@ pub struct DatanodeOptions {
pub query: QueryOptions,
pub memory: MemoryOptions,
/// Environment variable keys to read and report in heartbeat messages.
/// The values of these env vars at startup will be sent to metasrv.
pub heartbeat_env_vars: Vec<String>,
/// Deprecated options, please use the new options instead.
#[deprecated(note = "Please use `grpc.bind_addr` instead.")]
pub rpc_addr: Option<String>,
@@ -136,6 +140,7 @@ impl Default for DatanodeOptions {
tracing: TracingOptions::default(),
query: QueryOptions::default(),
memory: MemoryOptions::default(),
heartbeat_env_vars: vec![],
// Deprecated options
rpc_addr: None,
@@ -149,7 +154,11 @@ impl Default for DatanodeOptions {
impl Configurable for DatanodeOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
Some(&[
"heartbeat_env_vars",
"meta_client.metasrv_addrs",
"wal.broker_endpoints",
])
}
}

View File

@@ -21,7 +21,7 @@ use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use common_base::Plugins;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::datanode::{EnvVars, REGION_STATISTIC_KEY};
use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
@@ -66,6 +66,7 @@ pub struct HeartbeatTask {
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
region_alive_keeper: Arc<RegionAliveKeeper>,
resource_stat: ResourceStatRef,
env_vars: EnvVars,
}
impl Drop for HeartbeatTask {
@@ -114,6 +115,7 @@ impl HeartbeatTask {
resp_handler_executor,
region_alive_keeper,
resource_stat,
env_vars: EnvVars::from_config(&opts.heartbeat_env_vars),
})
}
@@ -258,6 +260,8 @@ impl HeartbeatTask {
.mito_engine()
.context(RegionEngineNotFoundSnafu { name: "mito" })?
.gc_limiter();
let mut env_var_extensions = HashMap::new();
self.env_vars.into_extensions(&mut env_var_extensions);
common_runtime::spawn_hb(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
@@ -300,7 +304,7 @@ impl HeartbeatTask {
if let Some(message) = message {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let mut extensions = heartbeat_request.extensions.clone();
let mut extensions = env_var_extensions.clone();
let gc_stat = gc_limiter.gc_stat();
gc_stat.into_extensions(&mut extensions);
@@ -328,7 +332,7 @@ impl HeartbeatTask {
let now = Instant::now();
let duration_since_epoch = (now - epoch).as_millis() as u64;
let mut extensions = heartbeat_request.extensions.clone();
let mut extensions = env_var_extensions.clone();
let gc_stat = gc_limiter.gc_stat();
gc_stat.into_extensions(&mut extensions);

View File

@@ -72,6 +72,8 @@ pub struct FrontendOptions {
pub memory: MemoryOptions,
/// The event recorder options.
pub event_recorder: EventRecorderOptions,
/// Environment variable keys to read and report in heartbeat messages.
pub heartbeat_env_vars: Vec<String>,
}
impl Default for FrontendOptions {
@@ -101,13 +103,14 @@ impl Default for FrontendOptions {
slow_query: SlowQueryOptions::default(),
memory: MemoryOptions::default(),
event_recorder: EventRecorderOptions::default(),
heartbeat_env_vars: vec![],
}
}
}
impl Configurable for FrontendOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client.metasrv_addrs"])
Some(&["heartbeat_env_vars", "meta_client.metasrv_addrs"])
}
}

View File

@@ -18,6 +18,7 @@ mod tests;
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
use common_meta::datanode::EnvVars;
use common_meta::heartbeat::handler::{
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
};
@@ -45,6 +46,7 @@ pub struct HeartbeatTask {
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
start_time_ms: u64,
resource_stat: ResourceStatRef,
env_vars: EnvVars,
}
impl HeartbeatTask {
@@ -67,6 +69,7 @@ impl HeartbeatTask {
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
resource_stat,
env_vars: EnvVars::from_config(&opts.heartbeat_env_vars),
}
}
@@ -202,10 +205,14 @@ impl HeartbeatTask {
let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
let resource_stat = self.resource_stat.clone();
let env_vars = self.env_vars.clone();
common_runtime::spawn_hb(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);
let mut extensions = std::collections::HashMap::new();
env_vars.into_extensions(&mut extensions);
let heartbeat_request = HeartbeatRequest {
peer: self_peer,
info: Self::build_node_info(
@@ -213,6 +220,7 @@ impl HeartbeatTask {
total_cpu_millicores,
total_memory_bytes,
),
extensions,
..Default::default()
};

View File

@@ -460,6 +460,7 @@ impl ClusterInfo for MetaClient {
cpu_usage_millicores: node_info.cpu_usage_millicores,
memory_usage_bytes: node_info.memory_usage_bytes,
hostname: node_info.hostname,
env_vars: Default::default(),
}
} else {
// TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
@@ -475,6 +476,7 @@ impl ClusterInfo for MetaClient {
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "".to_string(),
env_vars: Default::default(),
}
}
})
@@ -492,6 +494,7 @@ impl ClusterInfo for MetaClient {
cpu_usage_millicores: node_info.cpu_usage_millicores,
memory_usage_bytes: node_info.memory_usage_bytes,
hostname: node_info.hostname,
env_vars: Default::default(),
}
} else {
// TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto.
@@ -507,6 +510,7 @@ impl ClusterInfo for MetaClient {
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "".to_string(),
env_vars: Default::default(),
}
}
}))

View File

@@ -169,6 +169,7 @@ mod tests {
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: String::new(),
env_vars: Default::default(),
}
}

View File

@@ -379,6 +379,7 @@ mod tests {
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
env_vars: Default::default(),
};
let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
@@ -407,6 +408,7 @@ mod tests {
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
env_vars: Default::default(),
};
in_memory
@@ -450,6 +452,7 @@ mod tests {
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
env_vars: Default::default(),
};
let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
in_memory
@@ -492,6 +495,7 @@ mod tests {
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
env_vars: Default::default(),
};
let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);

View File

@@ -12,13 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
use common_meta::cluster::{
DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
};
use common_meta::datanode::EnvVars;
use common_meta::heartbeat::utils::get_flownode_workloads;
use common_meta::peer::Peer;
use common_meta::rpc::store::PutRequest;
use common_telemetry::warn;
use snafu::ResultExt;
use store_api::region_engine::RegionRole;
@@ -42,7 +46,7 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some((key, peer, info)) = extract_base_info(req) else {
let Some((key, peer, info, env_vars)) = extract_base_info(req) else {
return Ok(HandleControl::Continue);
};
@@ -58,6 +62,7 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
cpu_usage_millicores: info.cpu_usage_millicores,
memory_usage_bytes: info.memory_usage_bytes,
hostname: info.hostname,
env_vars,
};
put_into_memory_store(ctx, key, value).await?;
@@ -80,7 +85,7 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some((key, peer, info)) = extract_base_info(req) else {
let Some((key, peer, info, env_vars)) = extract_base_info(req) else {
return Ok(HandleControl::Continue);
};
let flownode_workloads = get_flownode_workloads(req.node_workloads.as_ref());
@@ -99,6 +104,7 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
cpu_usage_millicores: info.cpu_usage_millicores,
memory_usage_bytes: info.memory_usage_bytes,
hostname: info.hostname,
env_vars,
};
put_into_memory_store(ctx, key, value).await?;
@@ -122,7 +128,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let Some((key, peer, info)) = extract_base_info(req) else {
let Some((key, peer, info, env_vars)) = extract_base_info(req) else {
return Ok(HandleControl::Continue);
};
@@ -155,6 +161,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
cpu_usage_millicores: info.cpu_usage_millicores,
memory_usage_bytes: info.memory_usage_bytes,
hostname: info.hostname,
env_vars,
};
put_into_memory_store(ctx, key, value).await?;
@@ -163,7 +170,9 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler {
}
}
fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> {
fn extract_base_info(
request: &HeartbeatRequest,
) -> Option<(NodeInfoKey, Peer, PbNodeInfo, HashMap<String, String>)> {
let HeartbeatRequest { peer, info, .. } = request;
let key = NodeInfoKey::new(request)?;
let Some(peer) = &peer else {
@@ -173,7 +182,17 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P
return None;
};
Some((key, peer.clone(), info.clone()))
let env_vars = EnvVars::from_extensions(&request.extensions)
.inspect_err(|e| {
warn!(e;
"Failed to deserialize __env_vars from heartbeat extensions, peer: {}", peer
);
})
.unwrap_or_default()
.map(|e| e.vars)
.unwrap_or_default();
Some((key, peer.clone(), info.clone(), env_vars))
}
async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> {

View File

@@ -88,6 +88,7 @@ impl InformationExtension for StandaloneInformationExtension {
.unwrap_or_default()
.to_string_lossy()
.to_string(),
env_vars: Default::default(),
};
Ok(vec![node_info])
}

View File

@@ -67,6 +67,8 @@ pub struct StandaloneOptions {
pub slow_query: SlowQueryOptions,
pub query: QueryOptions,
pub memory: MemoryOptions,
/// Environment variable keys to read and report in heartbeat messages.
pub heartbeat_env_vars: Vec<String>,
}
impl Default for StandaloneOptions {
@@ -102,13 +104,14 @@ impl Default for StandaloneOptions {
slow_query: SlowQueryOptions::default(),
query: QueryOptions::default(),
memory: MemoryOptions::default(),
heartbeat_env_vars: vec![],
}
}
}
impl Configurable for StandaloneOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["wal.broker_endpoints"])
Some(&["heartbeat_env_vars", "wal.broker_endpoints"])
}
}
@@ -141,6 +144,7 @@ impl StandaloneOptions {
logging: cloned_opts.logging,
user_provider: cloned_opts.user_provider,
slow_query: cloned_opts.slow_query,
heartbeat_env_vars: cloned_opts.heartbeat_env_vars.clone(),
..Default::default()
}
}
@@ -157,6 +161,7 @@ impl StandaloneOptions {
init_regions_in_background: cloned_opts.init_regions_in_background,
init_regions_parallelism: cloned_opts.init_regions_parallelism,
query: cloned_opts.query,
heartbeat_env_vars: cloned_opts.heartbeat_env_vars,
..Default::default()
}
}

View File

@@ -1437,6 +1437,7 @@ max_in_flight_write_bytes = "0KiB"
write_bytes_exhausted_policy = "wait"
init_regions_in_background = false
init_regions_parallelism = 16
heartbeat_env_vars = []
[http]
addr = "127.0.0.1:4000"