From 42aa58aa270bcd7fb7e99ac784b1defcf4851275 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 8 May 2026 15:37:53 +0800 Subject: [PATCH] feat: support env vars in heartbeat (#8064) * feat: support reporting env vars in heartbeat messages to metasrv Add `heartbeat_env_vars` config option for datanode and frontend. When configured, the specified environment variable values are read at startup and sent to metasrv in every heartbeat via the `extensions` map. Metasrv extracts and stores them in `NodeInfo` for use in routing decisions (e.g. AZ-aware region placement). - Add `EnvVars` helper in `common/meta/src/datanode.rs` following the existing `GcStat` extension pattern with `into_extensions`/`from_extensions` - Add `env_vars: HashMap` field to `NodeInfo` in `common/meta/src/cluster.rs` with `#[serde(default)]` for backward compat - Add `heartbeat_env_vars: Vec` config field to `DatanodeOptions`, `FrontendOptions`, and `StandaloneOptions` - Inject env vars into heartbeat `extensions` in both datanode and frontend heartbeat tasks (`datanode/src/heartbeat.rs`, `frontend/src/heartbeat.rs`) - Extract env vars from `req.extensions` in all three metasrv `CollectXxxClusterInfoHandler`s - Update `NodeInfo` construction sites in `meta-client`, `discovery/lease.rs`, and `standalone/information_extension.rs` - Update expected TOML output in `tests-integration/tests/http.rs` - Add unit tests for `EnvVars` round-trip and `NodeInfo` backward compat Signed-off-by: Lei, HUANG Signed-off-by: Lei, HUANG * refactor: address heartbeat env review feedback Signed-off-by: Lei, HUANG * chore: log error on deserialization failure Signed-off-by: Lei, HUANG * refactor: send heartbeat env vars once Signed-off-by: Lei, HUANG * fix: resend heartbeat env vars after reconnect Signed-off-by: Lei, HUANG * revert: keep env vars in every heartbeat Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG Signed-off-by: Lei, HUANG --- src/cmd/tests/load_config_test.rs | 24 +++++- src/common/meta/src/cluster.rs | 61 +++++++++++++++ src/common/meta/src/datanode.rs | 74 ++++++++++++++++++- src/datanode/src/config.rs | 11 ++- src/datanode/src/heartbeat.rs | 10 ++- src/frontend/src/frontend.rs | 5 +- src/frontend/src/heartbeat.rs | 8 ++ src/meta-client/src/client.rs | 4 + src/meta-client/src/client/util.rs | 1 + src/meta-srv/src/discovery/lease.rs | 4 + .../handler/collect_cluster_info_handler.rs | 29 ++++++-- src/standalone/src/information_extension.rs | 1 + src/standalone/src/options.rs | 7 +- tests-integration/tests/http.rs | 1 + 14 files changed, 227 insertions(+), 13 deletions(-) diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 2300a2250e..8478c08b53 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -16,7 +16,7 @@ use std::time::Duration; use cmd::options::GreptimeOptions; use common_base::memory_limit::MemoryLimit; -use common_config::{Configurable, DEFAULT_DATA_HOME}; +use common_config::{Configurable, DEFAULT_DATA_HOME, ENV_VAR_SEP}; use common_options::datanode::{ClientOptions, DatanodeClientOptions}; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT, LoggingOptions}; use common_wal::config::DatanodeWalConfig; @@ -311,3 +311,25 @@ fn test_load_standalone_example_config() { }; similar_asserts::assert_eq!(options, expected); } + +#[test] +fn test_load_heartbeat_env_vars_from_env() { + let env_prefix = "HEARTBEAT_ENV_VARS_UT"; + let env_key = [env_prefix, "HEARTBEAT_ENV_VARS"].join(ENV_VAR_SEP); + + temp_env::with_var(env_key, Some("AZ,REGION"), || { + let expected = vec!["AZ".to_string(), "REGION".to_string()]; + + let datanode = + GreptimeOptions::::load_layered_options(None, env_prefix).unwrap(); + similar_asserts::assert_eq!(datanode.component.heartbeat_env_vars, expected); + + let frontend = + GreptimeOptions::::load_layered_options(None, env_prefix).unwrap(); + similar_asserts::assert_eq!(frontend.component.heartbeat_env_vars, expected); + + let standalone = + GreptimeOptions::::load_layered_options(None, env_prefix).unwrap(); + similar_asserts::assert_eq!(standalone.component.heartbeat_env_vars, expected); + }); +} diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index 40ee8b0e2f..5ab2b87399 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::hash::{DefaultHasher, Hash, Hasher}; use std::str::FromStr; @@ -134,6 +135,9 @@ pub struct NodeInfo { // The node build hostname #[serde(default)] pub hostname: String, + /// Environment variables reported by the node. + #[serde(default)] + pub env_vars: HashMap, } #[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)] @@ -355,6 +359,7 @@ mod tests { cpu_usage_millicores: 0, memory_usage_bytes: 0, hostname: "test_hostname".to_string(), + env_vars: Default::default(), }; let node_info_bytes: Vec = node_info.try_into().unwrap(); @@ -451,6 +456,7 @@ mod tests { cpu_usage_millicores: 0, memory_usage_bytes: 0, hostname: "test_hostname".to_string(), + env_vars: Default::default(), }; let node_info_bytes: Vec = node_info.try_into().unwrap(); @@ -464,4 +470,59 @@ mod tests { } if workloads.types == vec![7] ); } + + #[test] + fn test_node_info_backward_compatible_without_env_vars() { + // Simulate a NodeInfo serialized before env_vars was added + let raw = r#"{ + "peer":{"id":1,"addr":"127.0.0.1"}, + "last_activity_ts":123, + "status":{"Datanode":{"rcus":0,"wcus":0,"leader_regions":0,"follower_regions":0,"workloads":{"types":[0]}}}, + "version":"", + "git_commit":"", + "start_time_ms":1, + "total_cpu_millicores":0, + "total_memory_bytes":0, + "cpu_usage_millicores":0, + "memory_usage_bytes":0, + "hostname":"test" + }"#; + + let node_info: NodeInfo = raw.parse().unwrap(); + assert!(node_info.env_vars.is_empty()); + } + + #[test] + fn test_node_info_with_env_vars_round_trip() { + let mut env_vars = HashMap::new(); + env_vars.insert("AZ".to_string(), "us-east-1a".to_string()); + + let node_info = NodeInfo { + peer: Peer { + id: 1, + addr: "127.0.0.1".to_string(), + }, + last_activity_ts: 123, + status: NodeStatus::Datanode(DatanodeStatus { + rcus: 0, + wcus: 0, + leader_regions: 0, + follower_regions: 0, + workloads: DatanodeWorkloads { types: vec![] }, + }), + version: "".to_string(), + git_commit: "".to_string(), + start_time_ms: 1, + total_cpu_millicores: 0, + total_memory_bytes: 0, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, + hostname: "test".to_string(), + env_vars, + }; + + let node_info_bytes: Vec = node_info.try_into().unwrap(); + let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap(); + assert_eq!(new_node_info.env_vars.get("AZ").unwrap(), "us-east-1a"); + } } diff --git a/src/common/meta/src/datanode.rs b/src/common/meta/src/datanode.rs index d6c6229801..f3d3a4bed1 100644 --- a/src/common/meta/src/datanode.rs +++ b/src/common/meta/src/datanode.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader}; @@ -371,6 +371,48 @@ impl GcStat { } } +/// Environment variables reported by a node in heartbeat messages. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct EnvVars { + pub vars: HashMap, +} + +impl EnvVars { + pub const ENV_VARS_KEY: &str = "__env_vars"; + + pub fn new(vars: HashMap) -> Self { + Self { vars } + } + + /// Read the configured env var keys from the environment and build an EnvVars. + pub fn from_config(keys: &[String]) -> Self { + let vars = keys + .iter() + .filter_map(|key| std::env::var(key).ok().map(|value| (key.clone(), value))) + .collect(); + Self { vars } + } + + pub fn into_extensions(&self, extensions: &mut HashMap>) { + if self.vars.is_empty() { + return; + } + let bytes = serde_json::to_vec(self).unwrap_or_default(); + extensions.insert(Self::ENV_VARS_KEY.to_string(), bytes); + } + + pub fn from_extensions(extensions: &HashMap>) -> Result> { + extensions + .get(Self::ENV_VARS_KEY) + .map(|bytes| { + serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu { + input: String::from_utf8_lossy(bytes).to_string(), + }) + }) + .transpose() + } +} + /// The key of the datanode stat in the memory store. /// /// The format is `__meta_datanode_stat-0-{node_id}`. @@ -596,4 +638,34 @@ mod tests { assert_eq!(stat.region_stats.len(), 1); assert_eq!(stat.region_stats[0].role, RegionRole::StagingLeader); } + + #[test] + fn test_env_vars_round_trip() { + let mut vars = HashMap::new(); + vars.insert("AZ".to_string(), "us-east-1a".to_string()); + vars.insert("REGION".to_string(), "us-east-1".to_string()); + let env_vars = EnvVars::new(vars); + + let mut extensions = HashMap::new(); + env_vars.into_extensions(&mut extensions); + + let extracted = EnvVars::from_extensions(&extensions).unwrap().unwrap(); + assert_eq!(extracted.vars.get("AZ").unwrap(), "us-east-1a"); + assert_eq!(extracted.vars.get("REGION").unwrap(), "us-east-1"); + } + + #[test] + fn test_env_vars_empty_not_written() { + let env_vars = EnvVars::default(); + let mut extensions = HashMap::new(); + env_vars.into_extensions(&mut extensions); + assert!(extensions.is_empty()); + } + + #[test] + fn test_env_vars_from_extensions_missing() { + let extensions = HashMap::new(); + let result = EnvVars::from_extensions(&extensions).unwrap(); + assert!(result.is_none()); + } } diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index f8d8beb9cf..2ce306006b 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -83,6 +83,10 @@ pub struct DatanodeOptions { pub query: QueryOptions, pub memory: MemoryOptions, + /// Environment variable keys to read and report in heartbeat messages. + /// The values of these env vars at startup will be sent to metasrv. + pub heartbeat_env_vars: Vec, + /// Deprecated options, please use the new options instead. #[deprecated(note = "Please use `grpc.bind_addr` instead.")] pub rpc_addr: Option, @@ -136,6 +140,7 @@ impl Default for DatanodeOptions { tracing: TracingOptions::default(), query: QueryOptions::default(), memory: MemoryOptions::default(), + heartbeat_env_vars: vec![], // Deprecated options rpc_addr: None, @@ -149,7 +154,11 @@ impl Default for DatanodeOptions { impl Configurable for DatanodeOptions { fn env_list_keys() -> Option<&'static [&'static str]> { - Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"]) + Some(&[ + "heartbeat_env_vars", + "meta_client.metasrv_addrs", + "wal.broker_endpoints", + ]) } } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index fe8866b7f9..3d77251953 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -21,7 +21,7 @@ use api::v1::meta::heartbeat_request::NodeWorkloads; use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat}; use common_base::Plugins; use common_meta::cache_invalidator::CacheInvalidatorRef; -use common_meta::datanode::REGION_STATISTIC_KEY; +use common_meta::datanode::{EnvVars, REGION_STATISTIC_KEY}; use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; @@ -66,6 +66,7 @@ pub struct HeartbeatTask { resp_handler_executor: HeartbeatResponseHandlerExecutorRef, region_alive_keeper: Arc, resource_stat: ResourceStatRef, + env_vars: EnvVars, } impl Drop for HeartbeatTask { @@ -114,6 +115,7 @@ impl HeartbeatTask { resp_handler_executor, region_alive_keeper, resource_stat, + env_vars: EnvVars::from_config(&opts.heartbeat_env_vars), }) } @@ -258,6 +260,8 @@ impl HeartbeatTask { .mito_engine() .context(RegionEngineNotFoundSnafu { name: "mito" })? .gc_limiter(); + let mut env_var_extensions = HashMap::new(); + self.env_vars.into_extensions(&mut env_var_extensions); common_runtime::spawn_hb(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); @@ -300,7 +304,7 @@ impl HeartbeatTask { if let Some(message) = message { match outgoing_message_to_mailbox_message(message) { Ok(message) => { - let mut extensions = heartbeat_request.extensions.clone(); + let mut extensions = env_var_extensions.clone(); let gc_stat = gc_limiter.gc_stat(); gc_stat.into_extensions(&mut extensions); @@ -328,7 +332,7 @@ impl HeartbeatTask { let now = Instant::now(); let duration_since_epoch = (now - epoch).as_millis() as u64; - let mut extensions = heartbeat_request.extensions.clone(); + let mut extensions = env_var_extensions.clone(); let gc_stat = gc_limiter.gc_stat(); gc_stat.into_extensions(&mut extensions); diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 69ae59517e..6e862fdce4 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -72,6 +72,8 @@ pub struct FrontendOptions { pub memory: MemoryOptions, /// The event recorder options. pub event_recorder: EventRecorderOptions, + /// Environment variable keys to read and report in heartbeat messages. + pub heartbeat_env_vars: Vec, } impl Default for FrontendOptions { @@ -101,13 +103,14 @@ impl Default for FrontendOptions { slow_query: SlowQueryOptions::default(), memory: MemoryOptions::default(), event_recorder: EventRecorderOptions::default(), + heartbeat_env_vars: vec![], } } } impl Configurable for FrontendOptions { fn env_list_keys() -> Option<&'static [&'static str]> { - Some(&["meta_client.metasrv_addrs"]) + Some(&["heartbeat_env_vars", "meta_client.metasrv_addrs"]) } } diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index add4940214..937c74be19 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -18,6 +18,7 @@ mod tests; use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer}; +use common_meta::datanode::EnvVars; use common_meta::heartbeat::handler::{ HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, }; @@ -45,6 +46,7 @@ pub struct HeartbeatTask { resp_handler_executor: HeartbeatResponseHandlerExecutorRef, start_time_ms: u64, resource_stat: ResourceStatRef, + env_vars: EnvVars, } impl HeartbeatTask { @@ -67,6 +69,7 @@ impl HeartbeatTask { resp_handler_executor, start_time_ms: common_time::util::current_time_millis() as u64, resource_stat, + env_vars: EnvVars::from_config(&opts.heartbeat_env_vars), } } @@ -202,10 +205,14 @@ impl HeartbeatTask { let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores(); let total_memory_bytes = self.resource_stat.get_total_memory_bytes(); let resource_stat = self.resource_stat.clone(); + let env_vars = self.env_vars.clone(); common_runtime::spawn_hb(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); + let mut extensions = std::collections::HashMap::new(); + env_vars.into_extensions(&mut extensions); + let heartbeat_request = HeartbeatRequest { peer: self_peer, info: Self::build_node_info( @@ -213,6 +220,7 @@ impl HeartbeatTask { total_cpu_millicores, total_memory_bytes, ), + extensions, ..Default::default() }; diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 205226c845..f4bdca684c 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -460,6 +460,7 @@ impl ClusterInfo for MetaClient { cpu_usage_millicores: node_info.cpu_usage_millicores, memory_usage_bytes: node_info.memory_usage_bytes, hostname: node_info.hostname, + env_vars: Default::default(), } } else { // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto. @@ -475,6 +476,7 @@ impl ClusterInfo for MetaClient { cpu_usage_millicores: 0, memory_usage_bytes: 0, hostname: "".to_string(), + env_vars: Default::default(), } } }) @@ -492,6 +494,7 @@ impl ClusterInfo for MetaClient { cpu_usage_millicores: node_info.cpu_usage_millicores, memory_usage_bytes: node_info.memory_usage_bytes, hostname: node_info.hostname, + env_vars: Default::default(), } } else { // TODO(zyy17): It's for backward compatibility. Remove this when the deprecated fields are removed from the proto. @@ -507,6 +510,7 @@ impl ClusterInfo for MetaClient { cpu_usage_millicores: 0, memory_usage_bytes: 0, hostname: "".to_string(), + env_vars: Default::default(), } } })) diff --git a/src/meta-client/src/client/util.rs b/src/meta-client/src/client/util.rs index 13f2135039..aa08871e04 100644 --- a/src/meta-client/src/client/util.rs +++ b/src/meta-client/src/client/util.rs @@ -169,6 +169,7 @@ mod tests { cpu_usage_millicores: 0, memory_usage_bytes: 0, hostname: String::new(), + env_vars: Default::default(), } } diff --git a/src/meta-srv/src/discovery/lease.rs b/src/meta-srv/src/discovery/lease.rs index b8ca8a0ebb..9fcc52f423 100644 --- a/src/meta-srv/src/discovery/lease.rs +++ b/src/meta-srv/src/discovery/lease.rs @@ -379,6 +379,7 @@ mod tests { cpu_usage_millicores: 0, memory_usage_bytes: 0, hostname: "test_hostname".to_string(), + env_vars: Default::default(), }; let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend); @@ -407,6 +408,7 @@ mod tests { cpu_usage_millicores: 0, memory_usage_bytes: 0, hostname: "test_hostname".to_string(), + env_vars: Default::default(), }; in_memory @@ -450,6 +452,7 @@ mod tests { cpu_usage_millicores: 0, memory_usage_bytes: 0, hostname: "test_hostname".to_string(), + env_vars: Default::default(), }; let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend); in_memory @@ -492,6 +495,7 @@ mod tests { cpu_usage_millicores: 0, memory_usage_bytes: 0, hostname: "test_hostname".to_string(), + env_vars: Default::default(), }; let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend); diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs index 09286c5836..c4d73477b6 100644 --- a/src/meta-srv/src/handler/collect_cluster_info_handler.rs +++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs @@ -12,13 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role}; use common_meta::cluster::{ DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, }; +use common_meta::datanode::EnvVars; use common_meta::heartbeat::utils::get_flownode_workloads; use common_meta::peer::Peer; use common_meta::rpc::store::PutRequest; +use common_telemetry::warn; use snafu::ResultExt; use store_api::region_engine::RegionRole; @@ -42,7 +46,7 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler { ctx: &mut Context, _acc: &mut HeartbeatAccumulator, ) -> Result { - let Some((key, peer, info)) = extract_base_info(req) else { + let Some((key, peer, info, env_vars)) = extract_base_info(req) else { return Ok(HandleControl::Continue); }; @@ -58,6 +62,7 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler { cpu_usage_millicores: info.cpu_usage_millicores, memory_usage_bytes: info.memory_usage_bytes, hostname: info.hostname, + env_vars, }; put_into_memory_store(ctx, key, value).await?; @@ -80,7 +85,7 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler { ctx: &mut Context, _acc: &mut HeartbeatAccumulator, ) -> Result { - let Some((key, peer, info)) = extract_base_info(req) else { + let Some((key, peer, info, env_vars)) = extract_base_info(req) else { return Ok(HandleControl::Continue); }; let flownode_workloads = get_flownode_workloads(req.node_workloads.as_ref()); @@ -99,6 +104,7 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler { cpu_usage_millicores: info.cpu_usage_millicores, memory_usage_bytes: info.memory_usage_bytes, hostname: info.hostname, + env_vars, }; put_into_memory_store(ctx, key, value).await?; @@ -122,7 +128,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result { - let Some((key, peer, info)) = extract_base_info(req) else { + let Some((key, peer, info, env_vars)) = extract_base_info(req) else { return Ok(HandleControl::Continue); }; @@ -155,6 +161,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { cpu_usage_millicores: info.cpu_usage_millicores, memory_usage_bytes: info.memory_usage_bytes, hostname: info.hostname, + env_vars, }; put_into_memory_store(ctx, key, value).await?; @@ -163,7 +170,9 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { } } -fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> { +fn extract_base_info( + request: &HeartbeatRequest, +) -> Option<(NodeInfoKey, Peer, PbNodeInfo, HashMap)> { let HeartbeatRequest { peer, info, .. } = request; let key = NodeInfoKey::new(request)?; let Some(peer) = &peer else { @@ -173,7 +182,17 @@ fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, P return None; }; - Some((key, peer.clone(), info.clone())) + let env_vars = EnvVars::from_extensions(&request.extensions) + .inspect_err(|e| { + warn!(e; + "Failed to deserialize __env_vars from heartbeat extensions, peer: {}", peer + ); + }) + .unwrap_or_default() + .map(|e| e.vars) + .unwrap_or_default(); + + Some((key, peer.clone(), info.clone(), env_vars)) } async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> { diff --git a/src/standalone/src/information_extension.rs b/src/standalone/src/information_extension.rs index fb3bc3e5fb..a7de19e48a 100644 --- a/src/standalone/src/information_extension.rs +++ b/src/standalone/src/information_extension.rs @@ -88,6 +88,7 @@ impl InformationExtension for StandaloneInformationExtension { .unwrap_or_default() .to_string_lossy() .to_string(), + env_vars: Default::default(), }; Ok(vec![node_info]) } diff --git a/src/standalone/src/options.rs b/src/standalone/src/options.rs index 35915742a7..dece6389f0 100644 --- a/src/standalone/src/options.rs +++ b/src/standalone/src/options.rs @@ -67,6 +67,8 @@ pub struct StandaloneOptions { pub slow_query: SlowQueryOptions, pub query: QueryOptions, pub memory: MemoryOptions, + /// Environment variable keys to read and report in heartbeat messages. + pub heartbeat_env_vars: Vec, } impl Default for StandaloneOptions { @@ -102,13 +104,14 @@ impl Default for StandaloneOptions { slow_query: SlowQueryOptions::default(), query: QueryOptions::default(), memory: MemoryOptions::default(), + heartbeat_env_vars: vec![], } } } impl Configurable for StandaloneOptions { fn env_list_keys() -> Option<&'static [&'static str]> { - Some(&["wal.broker_endpoints"]) + Some(&["heartbeat_env_vars", "wal.broker_endpoints"]) } } @@ -141,6 +144,7 @@ impl StandaloneOptions { logging: cloned_opts.logging, user_provider: cloned_opts.user_provider, slow_query: cloned_opts.slow_query, + heartbeat_env_vars: cloned_opts.heartbeat_env_vars.clone(), ..Default::default() } } @@ -157,6 +161,7 @@ impl StandaloneOptions { init_regions_in_background: cloned_opts.init_regions_in_background, init_regions_parallelism: cloned_opts.init_regions_parallelism, query: cloned_opts.query, + heartbeat_env_vars: cloned_opts.heartbeat_env_vars, ..Default::default() } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index e6c7aefe26..f270540035 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1437,6 +1437,7 @@ max_in_flight_write_bytes = "0KiB" write_bytes_exhausted_policy = "wait" init_regions_in_background = false init_regions_parallelism = 16 +heartbeat_env_vars = [] [http] addr = "127.0.0.1:4000"