From 3e45f947cbc4a820e85bedeeb04538925ea2e8f7 Mon Sep 17 00:00:00 2001 From: Suhas Thalanki Date: Mon, 21 Jul 2025 16:34:12 -0500 Subject: [PATCH] compute.rs changes from hadron --- compute_tools/src/compute.rs | 321 +++++++++++++++++++++++++++++++++-- 1 file changed, 306 insertions(+), 15 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 3ae946c10e..29f050d572 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,10 +1,13 @@ use anyhow::{Context, Result}; +use std::ffi::OsString; use chrono::{DateTime, Utc}; use compute_api::privilege::Privilege; use compute_api::responses::{ ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState, LfcPrewarmState, PromoteState, TlsConfig, }; +use compute_api::spec::GenericOption; +use compute_api::spec::Role; use compute_api::spec::{ ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent, }; @@ -40,7 +43,10 @@ use utils::shard::{ShardCount, ShardIndex, ShardNumber}; use crate::configurator::launch_configurator; use crate::disk_quota::set_disk_quota; + +use crate::hadron_metrics::COMPUTE_ATTACHED; use crate::installed_extensions::get_installed_extensions; +use crate::logger; use crate::logger::startup_context_from_env; use crate::lsn_lease::launch_lsn_lease_bg_task_for_static; use crate::metrics::COMPUTE_CTL_UP; @@ -113,6 +119,20 @@ pub struct ComputeNodeParams { /// Interval for installed extensions collection pub installed_extensions_collection_interval: Arc, + + // Hadron-added params + // Path to the compute node spec.json file on the local file system containing the compute + // spec, if specified. This is only used in tests. + pub spec_path_test_only: Option, + // Control plane URI and compute ID used to refresh compute spec after the compute node + // detects PS/tenant assignment changes. Passed through from the CLI. + pub control_plane_uri: Option, + /// Hadron instance ID of the compute node. + pub instance_id: Option, + /// Timeout of PG compute startup in the Init state. + pub pg_init_timeout: Option, + // Path to the `pg_isready` binary. + pub pg_isready_bin: String, } type TaskHandle = Mutex>>; @@ -154,6 +174,7 @@ pub struct RemoteExtensionMetrics { #[derive(Clone, Debug)] pub struct ComputeState { pub start_time: DateTime, + pub pg_start_time: Option>, pub status: ComputeStatus, /// Timestamp of the last Postgres activity. It could be `None` if /// compute wasn't used since start. @@ -191,6 +212,7 @@ impl ComputeState { pub fn new() -> Self { Self { start_time: Utc::now(), + pg_start_time: None, status: ComputeStatus::Empty, last_active: None, error: None, @@ -398,6 +420,124 @@ struct StartVmMonitorResult { vm_monitor: Option>>, } + +// BEGIN_HADRON +/// This function creates roles that are used by Databricks. +/// These roles are not needs to be botostrapped at PG Compute provisioning time. +/// The auth method for these roles are configured in databricks_pg_hba.conf in universe repository. +pub(crate) fn create_databricks_roles() -> Vec { + let roles = vec![ + // Role for prometheus_stats_exporter + Role { + name: "databricks_monitor".to_string(), + // This uses "local" connection and auth method for that is "trust", so no password is needed. + encrypted_password: None, + options: Some(vec![GenericOption { + name: "IN ROLE pg_monitor".to_string(), + value: None, + vartype: "string".to_string(), + }]), + }, + // Role for brickstore control plane + Role { + name: "databricks_control_plane".to_string(), + // Certificate user does not need password. + encrypted_password: None, + options: Some(vec![GenericOption { + name: "SUPERUSER".to_string(), + value: None, + vartype: "string".to_string(), + }]), + }, + // Role for brickstore httpgateway. + Role { + name: "databricks_gateway".to_string(), + // Certificate user does not need password. + encrypted_password: None, + options: None, + }, + ]; + + roles + .into_iter() + .map(|role| { + let query = format!( + r#" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT FROM pg_catalog.pg_roles WHERE rolname = '{}') + THEN + CREATE ROLE {} {}; + END IF; + END + $$;"#, + role.name, + role.name.pg_quote(), + role.to_pg_options(), + ); + query + }) + .collect() +} + +/// Databricks-specific environment variables to be passed to the `postgres` sub-process. +pub struct DatabricksEnvVars { + /// The Databricks "endpoint ID" of the compute instance. Used by `postgres` to check + /// the token scopes of internal auth tokens. + pub endpoint_id: String, + /// Hostname of the Databricks workspace URL this compute instance belongs to. + /// Used by postgres to verify Databricks PAT tokens. + pub workspace_host: String, +} + +impl DatabricksEnvVars { + pub fn new( + compute_spec: &ComputeSpec, + compute_id: Option, + instance_id: Option, + ) -> Self { + let endpoint_id = if let Some(instance_id) = instance_id { + // Use instance_id as endpoint_id if it is set. This code path is for PuPr model. + instance_id + } else { + // Use compute_id as endpoint_id if instance_id is not set. The code path is for PrPr model. + // compute_id is a string format of "{endpoint_id}/{compute_idx}" + // endpoint_id is a uuid. We only need to pass down endpoint_id to postgres. + // Panics if compute_id is not set or not in the expected format. + compute_id.unwrap().split('/').next().unwrap().to_string() + }; + let workspace_host = compute_spec + .databricks_settings + .as_ref() + .map(|s| s.databricks_workspace_host.clone()) + .unwrap_or("".to_string()); + Self { + endpoint_id, + workspace_host, + } + } + + /// Constants for the names of Databricks-specific postgres environment variables. + const DATABRICKS_ENDPOINT_ID_ENVVAR: &'static str = "DATABRICKS_ENDPOINT_ID"; + const DATABRICKS_WORKSPACE_HOST_ENVVAR: &'static str = "DATABRICKS_WORKSPACE_HOST"; + + /// Convert DatabricksEnvVars to a list of string pairs that can be passed as env vars. Consumes `self`. + pub fn to_env_var_list(self) -> Vec<(String, String)> { + vec![ + ( + Self::DATABRICKS_ENDPOINT_ID_ENVVAR.to_string(), + self.endpoint_id.clone(), + ), + ( + Self::DATABRICKS_WORKSPACE_HOST_ENVVAR.to_string(), + self.workspace_host.clone(), + ), + ] + } +} +// END_HADRON + impl ComputeNode { pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result { let connstr = params.connstr.as_str(); @@ -504,7 +644,15 @@ impl ComputeNode { let mut vm_monitor = None; let mut pg_process: Option = None; - match this.start_compute(&mut pg_process) { + // BEGIN HADRON + let compute_id = this.params.compute_id.clone(); + let instance_id: Option = std::env::var("INSTANCE_ID").ok(); + let hadron_mode = instance_id.is_some(); + let databricks_env_vars = + DatabricksEnvVars::new(&pspec.spec, Some(compute_id), instance_id); + // END HADRON + + match this.start_compute(&mut pg_process, databricks_env_vars) { Ok(()) => { // Success! Launch remaining services (just vm-monitor currently) vm_monitor = @@ -558,6 +706,12 @@ impl ComputeNode { // Reap the postgres process delay_exit |= this.cleanup_after_postgres_exit()?; + let mut delay_secs = 30; + // Shorten delay exit when compute ctl is running in Hadron + if hadron_mode { + delay_secs = 2; + } + // /terminate returns LSN. If we don't sleep at all, connection will break and we // won't get result. If we sleep too much, tests will take significantly longer // and Github Action run will error out @@ -570,7 +724,8 @@ impl ComputeNode { // If launch failed, keep serving HTTP requests for a while, so the cloud // control plane can get the actual error. if delay_exit { - info!("giving control plane 30s to collect the error before shutdown"); + info!("giving control plane {delay_secs}s to collect the error before shutdown"); + std::thread::sleep(Duration::from_secs(delay_secs)); } std::thread::sleep(sleep_duration); Ok(exit_code) @@ -617,7 +772,11 @@ impl ComputeNode { /// /// Note that this is in the critical path of a compute cold start. Keep this fast. /// Try to do things concurrently, to hide the latencies. - fn start_compute(self: &Arc, pg_handle: &mut Option) -> Result<()> { + fn start_compute( + self: &Arc, + pg_handle: &mut Option, + databricks_env_vars: DatabricksEnvVars, + ) -> Result<()> { let compute_state: ComputeState; let start_compute_span; @@ -648,6 +807,9 @@ impl ComputeNode { }; _this_entered = start_compute_span.enter(); + // Hadron: Record postgres start time (used to enforce pg_init_timeout). + state_guard.pg_start_time.replace(Utc::now()); + state_guard.set_status(ComputeStatus::Init, &self.state_changed); compute_state = state_guard.clone() } @@ -823,7 +985,8 @@ impl ComputeNode { ////// START POSTGRES let start_time = Utc::now(); - let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?; + let pg_process = + self.start_postgres(pspec.storage_auth_token.clone(), databricks_env_vars)?; let postmaster_pid = pg_process.pid(); *pg_handle = Some(pg_process); @@ -1028,7 +1191,9 @@ impl ComputeNode { // If it is something different then create_dir() will error out anyway. let pgdata = &self.params.pgdata; let _ok = fs::remove_dir_all(pgdata); - fs::create_dir(pgdata)?; + // Ignore creation errors if the directory already exists (e.g. mounting it ahead of time). + // If it is something different then PG startup will error out anyway. + let _ok = fs::create_dir(pgdata); fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?; Ok(()) @@ -1390,6 +1555,13 @@ impl ComputeNode { let pspec = compute_state.pspec.as_ref().expect("spec must be set"); let spec = &pspec.spec; let pgdata_path = Path::new(&self.params.pgdata); + let databricks_settings = spec.databricks_settings.as_ref(); + + let postgres_port = self + .params + .connstr + .port() + .expect("port must be present in connstr"); let tls_config = self.tls_config(&pspec.spec); @@ -1399,8 +1571,10 @@ impl ComputeNode { pgdata_path, &self.params, &pspec.spec, + postgres_port, self.params.internal_http_port, tls_config, + databricks_settings, )?; // Syncing safekeepers is only safe with primary nodes: if a primary @@ -1443,6 +1617,21 @@ impl ComputeNode { // Update pg_hba.conf received with basebackup. update_pg_hba(pgdata_path)?; + if let Some(settings) = databricks_settings { + copy_tls_certificates( + &settings.pg_compute_tls_settings.key_file, + &settings.pg_compute_tls_settings.cert_file, + pgdata_path, + )?; + + // Update pg_hba.conf received with basebackup including additional databricks settings. + update_pg_hba(pgdata_path, Some(&settings.databricks_pg_hba))?; + update_pg_ident(pgdata_path, Some(&settings.databricks_pg_ident))?; + } else { + // Update pg_hba.conf received with basebackup. + update_pg_hba(pgdata_path, None)?; + } + // Place pg_dynshmem under /dev/shm. This allows us to use // 'dynamic_shared_memory_type = mmap' so that the files are placed in // /dev/shm, similar to how 'dynamic_shared_memory_type = posix' works. @@ -1497,6 +1686,13 @@ impl ComputeNode { /// Start and stop a postgres process to warm up the VM for startup. pub fn prewarm_postgres_vm_memory(&self) -> Result<()> { + if self.params.instance_id.is_some() { + // We are running in Hadron mode. Disabling this prewarming step for now as it could run + // into dblet port conflicts and also doesn't add much value with our current infra. + info!("Skipping postgres prewarming in Hadron mode"); + return Ok(()); + } + info!("prewarming VM memory"); // Create pgdata @@ -1551,17 +1747,25 @@ impl ComputeNode { /// /// Returns a handle to the child process and a handle to the logs thread. #[instrument(skip_all)] - pub fn start_postgres(&self, storage_auth_token: Option) -> Result { + pub fn start_postgres( + &self, + storage_auth_token: Option, + databricks_env_vars: DatabricksEnvVars, + ) -> Result { let pgdata_path = Path::new(&self.params.pgdata); + info!( + "Starting Postgres for databricks endpoint id: {}", + &databricks_env_vars.endpoint_id + ); + + let mut env_vars = databricks_env_vars.to_env_var_list(); + env_vars.extend(storage_auth_token.map(|t| ("NEON_AUTH_TOKEN".to_string(), t))); + // Run postgres as a child process. - let mut pg = maybe_cgexec(&self.params.pgbin) + let mut pg: std::process::Child = maybe_cgexec(&self.params.pgbin) .args(["-D", &self.params.pgdata]) - .envs(if let Some(storage_auth_token) = &storage_auth_token { - vec![("NEON_AUTH_TOKEN", storage_auth_token)] - } else { - vec![] - }) + .envs(env_vars) .stderr(Stdio::piped()) .spawn() .expect("cannot start postgres process"); @@ -1713,7 +1917,12 @@ impl ComputeNode { /// Do initial configuration of the already started Postgres. #[instrument(skip_all)] pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { - let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config")); + let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config")); + // Set a 2-minute statement_timeout for the session applying config. The individual SQL statements + // used in apply_spec_sql() should not take long (they are just creating users and installing + // extensions). If any of them are stuck for an extended period of time it usually indicates a + // pageserver connectivity problem and we should bail out. + conf.options("-c statement_timeout=2min"); let conf = Arc::new(conf); let spec = Arc::new( @@ -1774,6 +1983,35 @@ impl ComputeNode { Ok::<(), anyhow::Error>(()) } + + // Signal to the configurator to refresh the configuration by pulling a new spec from the HCC. + // Note that this merely triggers a notification on a condition variable the configurator thread + // waits on. The configurator thread (in configurator.rs) pulls the new spec from the HCC and + // applies it. + pub async fn signal_refresh_configuration(&self) -> Result<()> { + let states_allowing_configuration_refresh = [ + ComputeStatus::Running, + ComputeStatus::Failed, + ComputeStatus::RefreshConfigurationPending, + ]; + + let mut state = self.state.lock().expect("state lock poisoned"); + if states_allowing_configuration_refresh.contains(&state.status) { + state.status = ComputeStatus::RefreshConfigurationPending; + self.state_changed.notify_all(); + Ok(()) + } else if state.status == ComputeStatus::Init { + // If the compute is in Init state, we can't refresh the configuration immediately, + // but we should be able to do that soon. + Ok(()) + } else { + Err(anyhow::anyhow!( + "Cannot refresh compute configuration in state {:?}", + state.status + )) + } + } + // Wrapped this around `pg_ctl reload`, but right now we don't use // `pg_ctl` for start / stop. #[instrument(skip_all)] @@ -1794,6 +2032,7 @@ impl ComputeNode { #[instrument(skip_all)] pub fn reconfigure(&self) -> Result<()> { let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec; + let databricks_settings = spec.databricks_settings.as_ref(); let tls_config = self.tls_config(&spec); @@ -1843,6 +2082,21 @@ impl ComputeNode { tls_config, )?; + + let postgres_port = self + .params + .connstr + .port() + .expect("port must be present in connstr"); + config::write_postgres_conf( + pgdata_path, + &spec, + postgres_port, + self.params.internal_http_port, + &self.compute_ctl_config.tls, + databricks_settings, + )?; + self.pg_reload_conf()?; if !spec.skip_pg_catalog_updates { @@ -1949,6 +2203,15 @@ impl ComputeNode { | ComputeStatus::Empty => { state = self.state_changed.wait(state).unwrap(); } + + // BEGIN HADRON + // Hadron does not currently use TLS in compute_ctl, but adding a wait + // here for completeness. The compute-initiated configuration + // refresh should be orthogonal to TLS certificate updates for now. + ComputeStatus::RefreshConfiguration + | ComputeStatus::RefreshConfigurationPending => { + state = self.state_changed.wait(state).unwrap(); + } // END HADRON"neon } } drop(state); @@ -2309,7 +2572,7 @@ LIMIT 100", if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { libs_vec = libs .split(&[',', '\'', ' ']) - .filter(|s| *s != "neon" && !s.is_empty()) + .filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty()) .map(str::to_string) .collect(); } @@ -2328,7 +2591,7 @@ LIMIT 100", if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) { preload_libs_vec = libs .split(&[',', '\'', ' ']) - .filter(|s| *s != "neon" && !s.is_empty()) + .filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty()) .map(str::to_string) .collect(); } @@ -2405,6 +2668,34 @@ LIMIT 100", } } + /// Set the compute spec and update related metrics. + /// This is the central place where pspec is updated. + pub fn set_spec(params: &ComputeNodeParams, state: &mut ComputeState, pspec: ParsedSpec) { + state.pspec = Some(pspec); + ComputeNode::update_attached_metric(params, state); + let _ = logger::update_ids(¶ms.instance_id, &Some(params.compute_id.clone())); + } + + pub fn update_attached_metric(params: &ComputeNodeParams, state: &mut ComputeState) { + // Update the pg_cctl_attached gauge when all identifiers are available. + if let Some(instance_id) = ¶ms.instance_id { + if let Some(pspec) = &state.pspec { + // Clear all values in the metric + COMPUTE_ATTACHED.reset(); + + // Set new metric value + COMPUTE_ATTACHED + .with_label_values(&[ + ¶ms.compute_id, + instance_id, + &pspec.tenant_id.to_string(), + &pspec.timeline_id.to_string(), + ]) + .set(1); + } + } + } + pub fn spawn_extension_stats_task(&self) { self.terminate_extension_stats_task();