From 3cffaedaba033dd3459ae9eb352b060b91dbd39e Mon Sep 17 00:00:00 2001 From: Suhas Thalanki Date: Tue, 22 Jul 2025 16:02:28 -0500 Subject: [PATCH] added changes from hadron configurator --- compute_tools/src/configurator.rs | 170 ++++++++++++++++++++++++++++-- control_plane/src/endpoint.rs | 21 +++- libs/compute_api/src/responses.rs | 14 +++ vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/postgres-v17 | 2 +- 7 files changed, 198 insertions(+), 15 deletions(-) diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs index d97bd37285..2c97912a7e 100644 --- a/compute_tools/src/configurator.rs +++ b/compute_tools/src/configurator.rs @@ -1,10 +1,16 @@ -use std::sync::Arc; +use std::fs::File; use std::thread; +use std::{path::Path, sync::Arc}; +use anyhow::Result; use compute_api::responses::ComputeStatus; +use compute_api::spec::ComputeSpec; use tracing::{error, info, instrument}; -use crate::compute::ComputeNode; +use crate::{ + compute::{ComputeNode, ParsedSpec}, + spec::get_config_from_control_plane, +}; #[instrument(skip_all)] fn configurator_main_loop(compute: &Arc) { @@ -12,12 +18,27 @@ fn configurator_main_loop(compute: &Arc) { loop { let mut state = compute.state.lock().unwrap(); - // We have to re-check the status after re-acquiring the lock because it could be that - // the status has changed while we were waiting for the lock, and we might not need to - // wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e. - // we are waiting for a condition variable that will never be signaled. - if state.status != ComputeStatus::ConfigurationPending { - state = compute.state_changed.wait(state).unwrap(); + if compute.params.lakebase_mode { + /* BEGIN_HADRON */ + // RefreshConfiguration should only be used inside the loop + assert_ne!(state.status, ComputeStatus::RefreshConfiguration); + /* END_HADRON */ + + while state.status != ComputeStatus::ConfigurationPending + && state.status != ComputeStatus::RefreshConfigurationPending + && state.status != ComputeStatus::Failed + { + info!("configurator: compute status: {:?}, sleeping", state.status); + state = compute.state_changed.wait(state).unwrap(); + } + } else { + // We have to re-check the status after re-acquiring the lock because it could be that + // the status has changed while we were waiting for the lock, and we might not need to + // wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e. + // we are waiting for a condition variable that will never be signaled. + if state.status != ComputeStatus::ConfigurationPending { + state = compute.state_changed.wait(state).unwrap(); + } } // Re-check the status after waking up @@ -26,17 +47,146 @@ fn configurator_main_loop(compute: &Arc) { state.set_status(ComputeStatus::Configuration, &compute.state_changed); drop(state); - let mut new_status = ComputeStatus::Failed; + let mut _new_status = ComputeStatus::Failed; if let Err(e) = compute.reconfigure() { error!("could not configure compute node: {}", e); + // TODO(BRC-1726): Remove this panic once we fix the state machine to allow futher + // configuration attempts after a failed configuration attempt. + error!("Compute node exiting due to configuration failure."); + std::process::exit(1); } else { - new_status = ComputeStatus::Running; + _new_status = ComputeStatus::Running; info!("compute node configured"); } // XXX: used to test that API is blocking // std::thread::sleep(std::time::Duration::from_millis(10000)); + compute.set_status(_new_status); + } else if state.status == ComputeStatus::RefreshConfigurationPending { + info!( + "compute node suspects its configuration is out of date, now refreshing configuration" + ); + state.set_status(ComputeStatus::RefreshConfiguration, &compute.state_changed); + // Drop the lock guard here to avoid holding the lock while downloading spec from the control plane / HCC. + // This is the only thread that can move compute_ctl out of the `RefreshConfiguration` state, so it + // is safe to drop the lock like this. + drop(state); + + let get_spec_result: anyhow::Result> = + if let Some(sp) = &compute.params.spec_path_test_only { + // This path is only to make testing easier. In production we always get the spec from the HCM. + info!("reloading spec.json from path: {:?}", sp); + let path = Path::new(sp); + if let Ok(file) = File::open(path) { + match serde_json::from_reader(file) { + Ok(spec) => Ok(Some(spec)), + Err(e) => { + error!("could not parse spec file: {}", e); + Err(anyhow::anyhow!("could not parse spec file: {}", e)) + } + } + } else { + error!("could not open spec file at path: {:?}", sp); + Err(anyhow::anyhow!( + "could not open spec file at path: {:?}", + sp + )) + } + } else if let Some(control_plane_uri) = &compute.params.control_plane_uri { + get_config_from_control_plane(control_plane_uri, &compute.params.compute_id).map( + |(spec_opt, _)| { + info!("got spec from control plane: {:?}", spec_opt); + spec_opt + }, + ) + } else { + Err(anyhow::anyhow!("spec_path_test_only is not set")) + }; + + // Parse any received ComputeSpec and transpose the result into a Result>. + let parsed_spec_result: Result> = get_spec_result.and_then(|spec| { + if let Some(spec) = spec { + if let Ok(pspec) = ParsedSpec::try_from(spec) { + Ok(Some(pspec)) + } else { + Err(anyhow::anyhow!("could not parse spec")) + } + } else { + Ok(None) + } + }); + + let new_status: ComputeStatus; + match parsed_spec_result { + // Control plane (HCM) returned a spec and we were able to parse it. + Ok(Some(pspec)) => { + { + let mut state = compute.state.lock().unwrap(); + // Defensive programming to make sure this thread is indeed the only one that can move the compute + // node out of the `RefreshConfiguration` state. Would be nice if we can encode this invariant + // into the type system. + assert_eq!(state.status, ComputeStatus::RefreshConfiguration); + + if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone()) + == Some(pspec.pageserver_connstr.clone()) + { + info!( + "Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration." + ); + state.status = ComputeStatus::Running; + compute.state_changed.notify_all(); + drop(state); + std::thread::sleep(std::time::Duration::from_secs(5)); + continue; + } + // state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire + // the compute.state lock again so we need to have the lock guard go out of scope here. We could add a + // "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner, + // but it's not worth forking the codebase too much for this minor point alone right now. + ComputeNode::set_spec(&compute.params, &mut state, pspec); + } + match compute.reconfigure() { + Ok(_) => { + info!("Refresh configuration: compute node configured"); + new_status = ComputeStatus::Running; + } + Err(e) => { + error!( + "Refresh configuration: could not configure compute node: {}", + e + ); + // Set the compute node back to the `RefreshConfigurationPending` state if the configuration + // was not successful. It should be okay to treat this situation the same as if the loop + // hasn't executed yet as long as the detection side keeps notifying. + new_status = ComputeStatus::RefreshConfigurationPending; + } + } + } + // Control plane (HCM)'s response does not contain a spec. This is the "Empty" attachment case. + Ok(None) => { + info!( + "Compute Manager signaled that this compute is no longer attached to any storage. Exiting." + ); + // We just immediately terminate the whole compute_ctl in this case. It's not necessary to attempt a + // clean shutdown as Postgres is probably not responding anyway (which is why we are in this refresh + // configuration state). + std::process::exit(1); + } + // Various error cases: + // - The request to the control plane (HCM) either failed or returned a malformed spec. + // - compute_ctl itself is configured incorrectly (e.g., compute_id is not set). + Err(e) => { + error!( + "Refresh configuration: error getting a parsed spec: {:?}", + e + ); + new_status = ComputeStatus::RefreshConfigurationPending; + // We may be dealing with an overloaded HCM if we end up in this path. Backoff 5 seconds before + // retrying to avoid hammering the HCM. + std::thread::sleep(std::time::Duration::from_secs(5)); + } + } compute.set_status(new_status); } else if state.status == ComputeStatus::Failed { info!("compute node is now in Failed state, exiting"); diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 4c569d7005..f3a02bf2d7 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -97,6 +97,8 @@ pub struct EndpointConf { reconfigure_concurrency: usize, drop_subscriptions_before_start: bool, features: Vec, + compute_id: String, + instance_id: Option, cluster: Option, compute_ctl_config: ComputeCtlConfig, privileged_role_name: Option, @@ -199,6 +201,8 @@ impl ComputeControlPlane { mode: ComputeMode, grpc: bool, skip_pg_catalog_updates: bool, + compute_id: &str, + instance_id: Option, drop_subscriptions_before_start: bool, privileged_role_name: Option, ) -> Result> { @@ -236,6 +240,8 @@ impl ComputeControlPlane { grpc, reconfigure_concurrency: 1, features: vec![], + compute_id: compute_id.to_owned(), + instance_id: instance_id.clone(), cluster: None, compute_ctl_config: compute_ctl_config.clone(), privileged_role_name: privileged_role_name.clone(), @@ -258,6 +264,8 @@ impl ComputeControlPlane { drop_subscriptions_before_start, reconfigure_concurrency: 1, features: vec![], + compute_id: compute_id.to_string(), + instance_id: instance_id.clone(), cluster: None, compute_ctl_config, privileged_role_name, @@ -331,6 +339,13 @@ pub struct Endpoint { reconfigure_concurrency: usize, // Feature flags features: Vec, + + // The compute_id is used to identify the compute node in the cloud. + compute_id: String, + + // Hadron database instance id used for PG authentication and logs + instance_id: Option, + // Cluster settings cluster: Option, @@ -395,6 +410,7 @@ pub struct EndpointStartArgs { pub autoprewarm: bool, pub offload_lfc_interval_seconds: Option, pub dev: bool, + pub pg_init_timeout: Option, } impl Endpoint { @@ -437,6 +453,8 @@ impl Endpoint { reconfigure_concurrency: conf.reconfigure_concurrency, drop_subscriptions_before_start: conf.drop_subscriptions_before_start, features: conf.features, + compute_id: conf.compute_id, + instance_id: conf.instance_id, cluster: conf.cluster, compute_ctl_config: conf.compute_ctl_config, privileged_role_name: conf.privileged_role_name, @@ -481,7 +499,7 @@ impl Endpoint { conf.append("restart_after_crash", "off"); // Load the 'neon' extension - conf.append("shared_preload_libraries", "neon"); + conf.append("shared_preload_libraries", "neon, databricks_auth"); conf.append_line(""); // Replication-related configurations, such as WAL sending @@ -785,6 +803,7 @@ impl Endpoint { shard_stripe_size: Some(args.shard_stripe_size), local_proxy_config: None, reconfigure_concurrency: self.reconfigure_concurrency, + databricks_settings: None, drop_subscriptions_before_start: self.drop_subscriptions_before_start, audit_log_level: ComputeAudit::Disabled, logs_export_host: None::, diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 5b8fc49750..0a1fb7e90e 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -173,6 +173,11 @@ pub enum ComputeStatus { TerminationPendingImmediate, // Terminated Postgres Terminated, + // A spec refresh is being requested + RefreshConfigurationPending, + // A spec refresh is being applied. We cannot refresh configuration again until the current + // refresh is done, i.e., signal_refresh_configuration() will return 500 error. + RefreshConfiguration, } #[derive(Deserialize, Serialize)] @@ -185,6 +190,10 @@ impl Display for ComputeStatus { match self { ComputeStatus::Empty => f.write_str("empty"), ComputeStatus::ConfigurationPending => f.write_str("configuration-pending"), + ComputeStatus::RefreshConfigurationPending => { + f.write_str("refresh-configuration-pending") + } + ComputeStatus::RefreshConfiguration => f.write_str("refresh-configuration"), ComputeStatus::Init => f.write_str("init"), ComputeStatus::Running => f.write_str("running"), ComputeStatus::Configuration => f.write_str("configuration"), @@ -285,10 +294,15 @@ pub struct TlsConfig { } /// Response of the `/computes/{compute_id}/spec` control-plane API. +/// This is not actually a compute API response, so consider moving +/// to a different place. #[derive(Deserialize, Debug)] pub struct ControlPlaneConfigResponse { pub spec: Option, pub status: ControlPlaneComputeStatus, + // Hadron: Deserialize this field into a harmless default if + // compute_ctl_config is not present for compatibility. + #[serde(default)] pub compute_ctl_config: ComputeCtlConfig, } diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index c9f9fdd011..4cacada8bd 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit c9f9fdd0113b52c0bd535afdb09d3a543aeee25f +Subproject commit 4cacada8bde7f6424751a0727a657783c6a1d20b diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index aaaeff2550..e5ee23d998 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit aaaeff2550d5deba58847f112af9b98fa3a58b00 +Subproject commit e5ee23d99874ea9f5b62f8acc7d076162ae95d6c diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 9b9cb4b3e3..ad2b69b582 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 9b9cb4b3e33347aea8f61e606bb6569979516de5 +Subproject commit ad2b69b58230290fc44c08fbe0c97981c64f6c7d diff --git a/vendor/postgres-v17 b/vendor/postgres-v17 index fa1788475e..ba750903a9 160000 --- a/vendor/postgres-v17 +++ b/vendor/postgres-v17 @@ -1 +1 @@ -Subproject commit fa1788475e3146cc9c7c6a1b74f48fd296898fcd +Subproject commit ba750903a90dded8098f2f56d0b2a9012e6166af