compute.rs changes from hadron

This commit is contained in:
Suhas Thalanki
2025-07-21 16:34:12 -05:00
parent 0dbe551802
commit 3e45f947cb

View File

@@ -1,10 +1,13 @@
use anyhow::{Context, Result};
use std::ffi::OsString;
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
LfcPrewarmState, PromoteState, TlsConfig,
};
use compute_api::spec::GenericOption;
use compute_api::spec::Role;
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent,
};
@@ -40,7 +43,10 @@ use utils::shard::{ShardCount, ShardIndex, ShardNumber};
use crate::configurator::launch_configurator;
use crate::disk_quota::set_disk_quota;
use crate::hadron_metrics::COMPUTE_ATTACHED;
use crate::installed_extensions::get_installed_extensions;
use crate::logger;
use crate::logger::startup_context_from_env;
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
use crate::metrics::COMPUTE_CTL_UP;
@@ -113,6 +119,20 @@ pub struct ComputeNodeParams {
/// Interval for installed extensions collection
pub installed_extensions_collection_interval: Arc<AtomicU64>,
// Hadron-added params
// Path to the compute node spec.json file on the local file system containing the compute
// spec, if specified. This is only used in tests.
pub spec_path_test_only: Option<OsString>,
// Control plane URI and compute ID used to refresh compute spec after the compute node
// detects PS/tenant assignment changes. Passed through from the CLI.
pub control_plane_uri: Option<String>,
/// Hadron instance ID of the compute node.
pub instance_id: Option<String>,
/// Timeout of PG compute startup in the Init state.
pub pg_init_timeout: Option<Duration>,
// Path to the `pg_isready` binary.
pub pg_isready_bin: String,
}
type TaskHandle = Mutex<Option<JoinHandle<()>>>;
@@ -154,6 +174,7 @@ pub struct RemoteExtensionMetrics {
#[derive(Clone, Debug)]
pub struct ComputeState {
pub start_time: DateTime<Utc>,
pub pg_start_time: Option<DateTime<Utc>>,
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity. It could be `None` if
/// compute wasn't used since start.
@@ -191,6 +212,7 @@ impl ComputeState {
pub fn new() -> Self {
Self {
start_time: Utc::now(),
pg_start_time: None,
status: ComputeStatus::Empty,
last_active: None,
error: None,
@@ -398,6 +420,124 @@ struct StartVmMonitorResult {
vm_monitor: Option<JoinHandle<Result<()>>>,
}
// BEGIN_HADRON
/// This function creates roles that are used by Databricks.
/// These roles are not needs to be botostrapped at PG Compute provisioning time.
/// The auth method for these roles are configured in databricks_pg_hba.conf in universe repository.
pub(crate) fn create_databricks_roles() -> Vec<String> {
let roles = vec![
// Role for prometheus_stats_exporter
Role {
name: "databricks_monitor".to_string(),
// This uses "local" connection and auth method for that is "trust", so no password is needed.
encrypted_password: None,
options: Some(vec![GenericOption {
name: "IN ROLE pg_monitor".to_string(),
value: None,
vartype: "string".to_string(),
}]),
},
// Role for brickstore control plane
Role {
name: "databricks_control_plane".to_string(),
// Certificate user does not need password.
encrypted_password: None,
options: Some(vec![GenericOption {
name: "SUPERUSER".to_string(),
value: None,
vartype: "string".to_string(),
}]),
},
// Role for brickstore httpgateway.
Role {
name: "databricks_gateway".to_string(),
// Certificate user does not need password.
encrypted_password: None,
options: None,
},
];
roles
.into_iter()
.map(|role| {
let query = format!(
r#"
DO $$
BEGIN
IF NOT EXISTS (
SELECT FROM pg_catalog.pg_roles WHERE rolname = '{}')
THEN
CREATE ROLE {} {};
END IF;
END
$$;"#,
role.name,
role.name.pg_quote(),
role.to_pg_options(),
);
query
})
.collect()
}
/// Databricks-specific environment variables to be passed to the `postgres` sub-process.
pub struct DatabricksEnvVars {
/// The Databricks "endpoint ID" of the compute instance. Used by `postgres` to check
/// the token scopes of internal auth tokens.
pub endpoint_id: String,
/// Hostname of the Databricks workspace URL this compute instance belongs to.
/// Used by postgres to verify Databricks PAT tokens.
pub workspace_host: String,
}
impl DatabricksEnvVars {
pub fn new(
compute_spec: &ComputeSpec,
compute_id: Option<String>,
instance_id: Option<String>,
) -> Self {
let endpoint_id = if let Some(instance_id) = instance_id {
// Use instance_id as endpoint_id if it is set. This code path is for PuPr model.
instance_id
} else {
// Use compute_id as endpoint_id if instance_id is not set. The code path is for PrPr model.
// compute_id is a string format of "{endpoint_id}/{compute_idx}"
// endpoint_id is a uuid. We only need to pass down endpoint_id to postgres.
// Panics if compute_id is not set or not in the expected format.
compute_id.unwrap().split('/').next().unwrap().to_string()
};
let workspace_host = compute_spec
.databricks_settings
.as_ref()
.map(|s| s.databricks_workspace_host.clone())
.unwrap_or("".to_string());
Self {
endpoint_id,
workspace_host,
}
}
/// Constants for the names of Databricks-specific postgres environment variables.
const DATABRICKS_ENDPOINT_ID_ENVVAR: &'static str = "DATABRICKS_ENDPOINT_ID";
const DATABRICKS_WORKSPACE_HOST_ENVVAR: &'static str = "DATABRICKS_WORKSPACE_HOST";
/// Convert DatabricksEnvVars to a list of string pairs that can be passed as env vars. Consumes `self`.
pub fn to_env_var_list(self) -> Vec<(String, String)> {
vec![
(
Self::DATABRICKS_ENDPOINT_ID_ENVVAR.to_string(),
self.endpoint_id.clone(),
),
(
Self::DATABRICKS_WORKSPACE_HOST_ENVVAR.to_string(),
self.workspace_host.clone(),
),
]
}
}
// END_HADRON
impl ComputeNode {
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
let connstr = params.connstr.as_str();
@@ -504,7 +644,15 @@ impl ComputeNode {
let mut vm_monitor = None;
let mut pg_process: Option<PostgresHandle> = None;
match this.start_compute(&mut pg_process) {
// BEGIN HADRON
let compute_id = this.params.compute_id.clone();
let instance_id: Option<String> = std::env::var("INSTANCE_ID").ok();
let hadron_mode = instance_id.is_some();
let databricks_env_vars =
DatabricksEnvVars::new(&pspec.spec, Some(compute_id), instance_id);
// END HADRON
match this.start_compute(&mut pg_process, databricks_env_vars) {
Ok(()) => {
// Success! Launch remaining services (just vm-monitor currently)
vm_monitor =
@@ -558,6 +706,12 @@ impl ComputeNode {
// Reap the postgres process
delay_exit |= this.cleanup_after_postgres_exit()?;
let mut delay_secs = 30;
// Shorten delay exit when compute ctl is running in Hadron
if hadron_mode {
delay_secs = 2;
}
// /terminate returns LSN. If we don't sleep at all, connection will break and we
// won't get result. If we sleep too much, tests will take significantly longer
// and Github Action run will error out
@@ -570,7 +724,8 @@ impl ComputeNode {
// If launch failed, keep serving HTTP requests for a while, so the cloud
// control plane can get the actual error.
if delay_exit {
info!("giving control plane 30s to collect the error before shutdown");
info!("giving control plane {delay_secs}s to collect the error before shutdown");
std::thread::sleep(Duration::from_secs(delay_secs));
}
std::thread::sleep(sleep_duration);
Ok(exit_code)
@@ -617,7 +772,11 @@ impl ComputeNode {
///
/// Note that this is in the critical path of a compute cold start. Keep this fast.
/// Try to do things concurrently, to hide the latencies.
fn start_compute(self: &Arc<Self>, pg_handle: &mut Option<PostgresHandle>) -> Result<()> {
fn start_compute(
self: &Arc<Self>,
pg_handle: &mut Option<PostgresHandle>,
databricks_env_vars: DatabricksEnvVars,
) -> Result<()> {
let compute_state: ComputeState;
let start_compute_span;
@@ -648,6 +807,9 @@ impl ComputeNode {
};
_this_entered = start_compute_span.enter();
// Hadron: Record postgres start time (used to enforce pg_init_timeout).
state_guard.pg_start_time.replace(Utc::now());
state_guard.set_status(ComputeStatus::Init, &self.state_changed);
compute_state = state_guard.clone()
}
@@ -823,7 +985,8 @@ impl ComputeNode {
////// START POSTGRES
let start_time = Utc::now();
let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
let pg_process =
self.start_postgres(pspec.storage_auth_token.clone(), databricks_env_vars)?;
let postmaster_pid = pg_process.pid();
*pg_handle = Some(pg_process);
@@ -1028,7 +1191,9 @@ impl ComputeNode {
// If it is something different then create_dir() will error out anyway.
let pgdata = &self.params.pgdata;
let _ok = fs::remove_dir_all(pgdata);
fs::create_dir(pgdata)?;
// Ignore creation errors if the directory already exists (e.g. mounting it ahead of time).
// If it is something different then PG startup will error out anyway.
let _ok = fs::create_dir(pgdata);
fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?;
Ok(())
@@ -1390,6 +1555,13 @@ impl ComputeNode {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.params.pgdata);
let databricks_settings = spec.databricks_settings.as_ref();
let postgres_port = self
.params
.connstr
.port()
.expect("port must be present in connstr");
let tls_config = self.tls_config(&pspec.spec);
@@ -1399,8 +1571,10 @@ impl ComputeNode {
pgdata_path,
&self.params,
&pspec.spec,
postgres_port,
self.params.internal_http_port,
tls_config,
databricks_settings,
)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
@@ -1443,6 +1617,21 @@ impl ComputeNode {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
if let Some(settings) = databricks_settings {
copy_tls_certificates(
&settings.pg_compute_tls_settings.key_file,
&settings.pg_compute_tls_settings.cert_file,
pgdata_path,
)?;
// Update pg_hba.conf received with basebackup including additional databricks settings.
update_pg_hba(pgdata_path, Some(&settings.databricks_pg_hba))?;
update_pg_ident(pgdata_path, Some(&settings.databricks_pg_ident))?;
} else {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path, None)?;
}
// Place pg_dynshmem under /dev/shm. This allows us to use
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
// /dev/shm, similar to how 'dynamic_shared_memory_type = posix' works.
@@ -1497,6 +1686,13 @@ impl ComputeNode {
/// Start and stop a postgres process to warm up the VM for startup.
pub fn prewarm_postgres_vm_memory(&self) -> Result<()> {
if self.params.instance_id.is_some() {
// We are running in Hadron mode. Disabling this prewarming step for now as it could run
// into dblet port conflicts and also doesn't add much value with our current infra.
info!("Skipping postgres prewarming in Hadron mode");
return Ok(());
}
info!("prewarming VM memory");
// Create pgdata
@@ -1551,17 +1747,25 @@ impl ComputeNode {
///
/// Returns a handle to the child process and a handle to the logs thread.
#[instrument(skip_all)]
pub fn start_postgres(&self, storage_auth_token: Option<String>) -> Result<PostgresHandle> {
pub fn start_postgres(
&self,
storage_auth_token: Option<String>,
databricks_env_vars: DatabricksEnvVars,
) -> Result<PostgresHandle> {
let pgdata_path = Path::new(&self.params.pgdata);
info!(
"Starting Postgres for databricks endpoint id: {}",
&databricks_env_vars.endpoint_id
);
let mut env_vars = databricks_env_vars.to_env_var_list();
env_vars.extend(storage_auth_token.map(|t| ("NEON_AUTH_TOKEN".to_string(), t)));
// Run postgres as a child process.
let mut pg = maybe_cgexec(&self.params.pgbin)
let mut pg: std::process::Child = maybe_cgexec(&self.params.pgbin)
.args(["-D", &self.params.pgdata])
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
})
.envs(env_vars)
.stderr(Stdio::piped())
.spawn()
.expect("cannot start postgres process");
@@ -1713,7 +1917,12 @@ impl ComputeNode {
/// Do initial configuration of the already started Postgres.
#[instrument(skip_all)]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
// Set a 2-minute statement_timeout for the session applying config. The individual SQL statements
// used in apply_spec_sql() should not take long (they are just creating users and installing
// extensions). If any of them are stuck for an extended period of time it usually indicates a
// pageserver connectivity problem and we should bail out.
conf.options("-c statement_timeout=2min");
let conf = Arc::new(conf);
let spec = Arc::new(
@@ -1774,6 +1983,35 @@ impl ComputeNode {
Ok::<(), anyhow::Error>(())
}
// Signal to the configurator to refresh the configuration by pulling a new spec from the HCC.
// Note that this merely triggers a notification on a condition variable the configurator thread
// waits on. The configurator thread (in configurator.rs) pulls the new spec from the HCC and
// applies it.
pub async fn signal_refresh_configuration(&self) -> Result<()> {
let states_allowing_configuration_refresh = [
ComputeStatus::Running,
ComputeStatus::Failed,
ComputeStatus::RefreshConfigurationPending,
];
let mut state = self.state.lock().expect("state lock poisoned");
if states_allowing_configuration_refresh.contains(&state.status) {
state.status = ComputeStatus::RefreshConfigurationPending;
self.state_changed.notify_all();
Ok(())
} else if state.status == ComputeStatus::Init {
// If the compute is in Init state, we can't refresh the configuration immediately,
// but we should be able to do that soon.
Ok(())
} else {
Err(anyhow::anyhow!(
"Cannot refresh compute configuration in state {:?}",
state.status
))
}
}
// Wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop.
#[instrument(skip_all)]
@@ -1794,6 +2032,7 @@ impl ComputeNode {
#[instrument(skip_all)]
pub fn reconfigure(&self) -> Result<()> {
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
let databricks_settings = spec.databricks_settings.as_ref();
let tls_config = self.tls_config(&spec);
@@ -1843,6 +2082,21 @@ impl ComputeNode {
tls_config,
)?;
let postgres_port = self
.params
.connstr
.port()
.expect("port must be present in connstr");
config::write_postgres_conf(
pgdata_path,
&spec,
postgres_port,
self.params.internal_http_port,
&self.compute_ctl_config.tls,
databricks_settings,
)?;
self.pg_reload_conf()?;
if !spec.skip_pg_catalog_updates {
@@ -1949,6 +2203,15 @@ impl ComputeNode {
| ComputeStatus::Empty => {
state = self.state_changed.wait(state).unwrap();
}
// BEGIN HADRON
// Hadron does not currently use TLS in compute_ctl, but adding a wait
// here for completeness. The compute-initiated configuration
// refresh should be orthogonal to TLS certificate updates for now.
ComputeStatus::RefreshConfiguration
| ComputeStatus::RefreshConfigurationPending => {
state = self.state_changed.wait(state).unwrap();
} // END HADRON"neon
}
}
drop(state);
@@ -2309,7 +2572,7 @@ LIMIT 100",
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
libs_vec = libs
.split(&[',', '\'', ' '])
.filter(|s| *s != "neon" && !s.is_empty())
.filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty())
.map(str::to_string)
.collect();
}
@@ -2328,7 +2591,7 @@ LIMIT 100",
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
preload_libs_vec = libs
.split(&[',', '\'', ' '])
.filter(|s| *s != "neon" && !s.is_empty())
.filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty())
.map(str::to_string)
.collect();
}
@@ -2405,6 +2668,34 @@ LIMIT 100",
}
}
/// Set the compute spec and update related metrics.
/// This is the central place where pspec is updated.
pub fn set_spec(params: &ComputeNodeParams, state: &mut ComputeState, pspec: ParsedSpec) {
state.pspec = Some(pspec);
ComputeNode::update_attached_metric(params, state);
let _ = logger::update_ids(&params.instance_id, &Some(params.compute_id.clone()));
}
pub fn update_attached_metric(params: &ComputeNodeParams, state: &mut ComputeState) {
// Update the pg_cctl_attached gauge when all identifiers are available.
if let Some(instance_id) = &params.instance_id {
if let Some(pspec) = &state.pspec {
// Clear all values in the metric
COMPUTE_ATTACHED.reset();
// Set new metric value
COMPUTE_ATTACHED
.with_label_values(&[
&params.compute_id,
instance_id,
&pspec.tenant_id.to_string(),
&pspec.timeline_id.to_string(),
])
.set(1);
}
}
}
pub fn spawn_extension_stats_task(&self) {
self.terminate_extension_stats_task();