Compare commits

...

8 Commits

Author SHA1 Message Date
Suhas Thalanki
657204b474 Merge branch 'thesuhas/add_hadron_metrics' into thesuhas/migrate_compute_rs_hadron 2025-07-22 13:23:52 -05:00
Suhas Thalanki
daf8c70bec added hadron mod 2025-07-22 13:23:38 -05:00
Suhas Thalanki
bae49607df Merge branch 'thesuhas/add_hadron_metrics' into thesuhas/migrate_compute_rs_hadron 2025-07-22 13:20:49 -05:00
Suhas Thalanki
c11a3f7ee7 Merge branch 'main' into thesuhas/migrate_compute_rs_hadron 2025-07-22 11:40:30 -05:00
Suhas Thalanki
32861cb821 Merge branch 'main' into thesuhas/add_hadron_metrics 2025-07-22 11:35:28 -05:00
Suhas Thalanki
b1f53b82a7 Merge branch 'thesuhas/add_hadron_metrics' into thesuhas/migrate_compute_rs_hadron 2025-07-21 16:35:56 -05:00
Suhas Thalanki
104780f225 added hadron metrics 2025-07-21 16:35:34 -05:00
Suhas Thalanki
3e45f947cb compute.rs changes from hadron 2025-07-21 16:34:12 -05:00
3 changed files with 363 additions and 17 deletions

View File

@@ -1,10 +1,13 @@
use anyhow::{Context, Result};
use std::ffi::OsString;
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
LfcPrewarmState, PromoteState, TlsConfig,
};
use compute_api::spec::GenericOption;
use compute_api::spec::Role;
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent,
};
@@ -40,7 +43,10 @@ use utils::shard::{ShardCount, ShardIndex, ShardNumber};
use crate::configurator::launch_configurator;
use crate::disk_quota::set_disk_quota;
use crate::hadron_metrics::COMPUTE_ATTACHED;
use crate::installed_extensions::get_installed_extensions;
use crate::logger;
use crate::logger::startup_context_from_env;
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
use crate::metrics::COMPUTE_CTL_UP;
@@ -118,6 +124,20 @@ pub struct ComputeNodeParams {
pub pg_init_timeout: Option<Duration>,
pub lakebase_mode: bool,
// Hadron-added params
// Path to the compute node spec.json file on the local file system containing the compute
// spec, if specified. This is only used in tests.
pub spec_path_test_only: Option<OsString>,
// Control plane URI and compute ID used to refresh compute spec after the compute node
// detects PS/tenant assignment changes. Passed through from the CLI.
pub control_plane_uri: Option<String>,
/// Hadron instance ID of the compute node.
pub instance_id: Option<String>,
/// Timeout of PG compute startup in the Init state.
pub pg_init_timeout: Option<Duration>,
// Path to the `pg_isready` binary.
pub pg_isready_bin: String,
}
type TaskHandle = Mutex<Option<JoinHandle<()>>>;
@@ -405,6 +425,124 @@ struct StartVmMonitorResult {
vm_monitor: Option<JoinHandle<Result<()>>>,
}
// BEGIN_HADRON
/// This function creates roles that are used by Databricks.
/// These roles are not needs to be botostrapped at PG Compute provisioning time.
/// The auth method for these roles are configured in databricks_pg_hba.conf in universe repository.
pub(crate) fn create_databricks_roles() -> Vec<String> {
let roles = vec![
// Role for prometheus_stats_exporter
Role {
name: "databricks_monitor".to_string(),
// This uses "local" connection and auth method for that is "trust", so no password is needed.
encrypted_password: None,
options: Some(vec![GenericOption {
name: "IN ROLE pg_monitor".to_string(),
value: None,
vartype: "string".to_string(),
}]),
},
// Role for brickstore control plane
Role {
name: "databricks_control_plane".to_string(),
// Certificate user does not need password.
encrypted_password: None,
options: Some(vec![GenericOption {
name: "SUPERUSER".to_string(),
value: None,
vartype: "string".to_string(),
}]),
},
// Role for brickstore httpgateway.
Role {
name: "databricks_gateway".to_string(),
// Certificate user does not need password.
encrypted_password: None,
options: None,
},
];
roles
.into_iter()
.map(|role| {
let query = format!(
r#"
DO $$
BEGIN
IF NOT EXISTS (
SELECT FROM pg_catalog.pg_roles WHERE rolname = '{}')
THEN
CREATE ROLE {} {};
END IF;
END
$$;"#,
role.name,
role.name.pg_quote(),
role.to_pg_options(),
);
query
})
.collect()
}
/// Databricks-specific environment variables to be passed to the `postgres` sub-process.
pub struct DatabricksEnvVars {
/// The Databricks "endpoint ID" of the compute instance. Used by `postgres` to check
/// the token scopes of internal auth tokens.
pub endpoint_id: String,
/// Hostname of the Databricks workspace URL this compute instance belongs to.
/// Used by postgres to verify Databricks PAT tokens.
pub workspace_host: String,
}
impl DatabricksEnvVars {
pub fn new(
compute_spec: &ComputeSpec,
compute_id: Option<String>,
instance_id: Option<String>,
) -> Self {
let endpoint_id = if let Some(instance_id) = instance_id {
// Use instance_id as endpoint_id if it is set. This code path is for PuPr model.
instance_id
} else {
// Use compute_id as endpoint_id if instance_id is not set. The code path is for PrPr model.
// compute_id is a string format of "{endpoint_id}/{compute_idx}"
// endpoint_id is a uuid. We only need to pass down endpoint_id to postgres.
// Panics if compute_id is not set or not in the expected format.
compute_id.unwrap().split('/').next().unwrap().to_string()
};
let workspace_host = compute_spec
.databricks_settings
.as_ref()
.map(|s| s.databricks_workspace_host.clone())
.unwrap_or("".to_string());
Self {
endpoint_id,
workspace_host,
}
}
/// Constants for the names of Databricks-specific postgres environment variables.
const DATABRICKS_ENDPOINT_ID_ENVVAR: &'static str = "DATABRICKS_ENDPOINT_ID";
const DATABRICKS_WORKSPACE_HOST_ENVVAR: &'static str = "DATABRICKS_WORKSPACE_HOST";
/// Convert DatabricksEnvVars to a list of string pairs that can be passed as env vars. Consumes `self`.
pub fn to_env_var_list(self) -> Vec<(String, String)> {
vec![
(
Self::DATABRICKS_ENDPOINT_ID_ENVVAR.to_string(),
self.endpoint_id.clone(),
),
(
Self::DATABRICKS_WORKSPACE_HOST_ENVVAR.to_string(),
self.workspace_host.clone(),
),
]
}
}
// END_HADRON
impl ComputeNode {
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
let connstr = params.connstr.as_str();
@@ -511,7 +649,15 @@ impl ComputeNode {
let mut vm_monitor = None;
let mut pg_process: Option<PostgresHandle> = None;
match this.start_compute(&mut pg_process) {
// BEGIN HADRON
let compute_id = this.params.compute_id.clone();
let instance_id: Option<String> = std::env::var("INSTANCE_ID").ok();
let hadron_mode = instance_id.is_some();
let databricks_env_vars =
DatabricksEnvVars::new(&pspec.spec, Some(compute_id), instance_id);
// END HADRON
match this.start_compute(&mut pg_process, databricks_env_vars) {
Ok(()) => {
// Success! Launch remaining services (just vm-monitor currently)
vm_monitor =
@@ -565,6 +711,12 @@ impl ComputeNode {
// Reap the postgres process
delay_exit |= this.cleanup_after_postgres_exit()?;
let mut delay_secs = 30;
// Shorten delay exit when compute ctl is running in Hadron
if hadron_mode {
delay_secs = 2;
}
// /terminate returns LSN. If we don't sleep at all, connection will break and we
// won't get result. If we sleep too much, tests will take significantly longer
// and Github Action run will error out
@@ -577,7 +729,8 @@ impl ComputeNode {
// If launch failed, keep serving HTTP requests for a while, so the cloud
// control plane can get the actual error.
if delay_exit {
info!("giving control plane 30s to collect the error before shutdown");
info!("giving control plane {delay_secs}s to collect the error before shutdown");
std::thread::sleep(Duration::from_secs(delay_secs));
}
std::thread::sleep(sleep_duration);
Ok(exit_code)
@@ -624,7 +777,11 @@ impl ComputeNode {
///
/// Note that this is in the critical path of a compute cold start. Keep this fast.
/// Try to do things concurrently, to hide the latencies.
fn start_compute(self: &Arc<Self>, pg_handle: &mut Option<PostgresHandle>) -> Result<()> {
fn start_compute(
self: &Arc<Self>,
pg_handle: &mut Option<PostgresHandle>,
databricks_env_vars: DatabricksEnvVars,
) -> Result<()> {
let compute_state: ComputeState;
let start_compute_span;
@@ -833,7 +990,8 @@ impl ComputeNode {
////// START POSTGRES
let start_time = Utc::now();
let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
let pg_process =
self.start_postgres(pspec.storage_auth_token.clone(), databricks_env_vars)?;
let postmaster_pid = pg_process.pid();
*pg_handle = Some(pg_process);
@@ -1038,7 +1196,9 @@ impl ComputeNode {
// If it is something different then create_dir() will error out anyway.
let pgdata = &self.params.pgdata;
let _ok = fs::remove_dir_all(pgdata);
fs::create_dir(pgdata)?;
// Ignore creation errors if the directory already exists (e.g. mounting it ahead of time).
// If it is something different then PG startup will error out anyway.
let _ok = fs::create_dir(pgdata);
fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?;
Ok(())
@@ -1400,6 +1560,13 @@ impl ComputeNode {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.params.pgdata);
let databricks_settings = spec.databricks_settings.as_ref();
let postgres_port = self
.params
.connstr
.port()
.expect("port must be present in connstr");
let tls_config = self.tls_config(&pspec.spec);
@@ -1409,8 +1576,10 @@ impl ComputeNode {
pgdata_path,
&self.params,
&pspec.spec,
postgres_port,
self.params.internal_http_port,
tls_config,
databricks_settings,
)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
@@ -1453,6 +1622,21 @@ impl ComputeNode {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path, None)?;
if let Some(settings) = databricks_settings {
copy_tls_certificates(
&settings.pg_compute_tls_settings.key_file,
&settings.pg_compute_tls_settings.cert_file,
pgdata_path,
)?;
// Update pg_hba.conf received with basebackup including additional databricks settings.
update_pg_hba(pgdata_path, Some(&settings.databricks_pg_hba))?;
update_pg_ident(pgdata_path, Some(&settings.databricks_pg_ident))?;
} else {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path, None)?;
}
// Place pg_dynshmem under /dev/shm. This allows us to use
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
// /dev/shm, similar to how 'dynamic_shared_memory_type = posix' works.
@@ -1507,6 +1691,13 @@ impl ComputeNode {
/// Start and stop a postgres process to warm up the VM for startup.
pub fn prewarm_postgres_vm_memory(&self) -> Result<()> {
if self.params.instance_id.is_some() {
// We are running in Hadron mode. Disabling this prewarming step for now as it could run
// into dblet port conflicts and also doesn't add much value with our current infra.
info!("Skipping postgres prewarming in Hadron mode");
return Ok(());
}
info!("prewarming VM memory");
// Create pgdata
@@ -1561,17 +1752,25 @@ impl ComputeNode {
///
/// Returns a handle to the child process and a handle to the logs thread.
#[instrument(skip_all)]
pub fn start_postgres(&self, storage_auth_token: Option<String>) -> Result<PostgresHandle> {
pub fn start_postgres(
&self,
storage_auth_token: Option<String>,
databricks_env_vars: DatabricksEnvVars,
) -> Result<PostgresHandle> {
let pgdata_path = Path::new(&self.params.pgdata);
info!(
"Starting Postgres for databricks endpoint id: {}",
&databricks_env_vars.endpoint_id
);
let mut env_vars = databricks_env_vars.to_env_var_list();
env_vars.extend(storage_auth_token.map(|t| ("NEON_AUTH_TOKEN".to_string(), t)));
// Run postgres as a child process.
let mut pg = maybe_cgexec(&self.params.pgbin)
let mut pg: std::process::Child = maybe_cgexec(&self.params.pgbin)
.args(["-D", &self.params.pgdata])
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
})
.envs(env_vars)
.stderr(Stdio::piped())
.spawn()
.expect("cannot start postgres process");
@@ -1723,7 +1922,12 @@ impl ComputeNode {
/// Do initial configuration of the already started Postgres.
#[instrument(skip_all)]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
// Set a 2-minute statement_timeout for the session applying config. The individual SQL statements
// used in apply_spec_sql() should not take long (they are just creating users and installing
// extensions). If any of them are stuck for an extended period of time it usually indicates a
// pageserver connectivity problem and we should bail out.
conf.options("-c statement_timeout=2min");
let conf = Arc::new(conf);
let spec = Arc::new(
@@ -1756,7 +1960,6 @@ impl ComputeNode {
}
// Run migrations separately to not hold up cold starts
let lakebase_mode = self.params.lakebase_mode;
let params = self.params.clone();
tokio::spawn(async move {
let mut conf = conf.as_ref().clone();
@@ -1769,7 +1972,7 @@ impl ComputeNode {
eprintln!("connection error: {e}");
}
});
if let Err(e) = handle_migrations(params, &mut client, lakebase_mode).await {
if let Err(e) = handle_migrations(params, &mut client).await {
error!("Failed to run migrations: {}", e);
}
}
@@ -1785,6 +1988,35 @@ impl ComputeNode {
Ok::<(), anyhow::Error>(())
}
// Signal to the configurator to refresh the configuration by pulling a new spec from the HCC.
// Note that this merely triggers a notification on a condition variable the configurator thread
// waits on. The configurator thread (in configurator.rs) pulls the new spec from the HCC and
// applies it.
pub async fn signal_refresh_configuration(&self) -> Result<()> {
let states_allowing_configuration_refresh = [
ComputeStatus::Running,
ComputeStatus::Failed,
ComputeStatus::RefreshConfigurationPending,
];
let mut state = self.state.lock().expect("state lock poisoned");
if states_allowing_configuration_refresh.contains(&state.status) {
state.status = ComputeStatus::RefreshConfigurationPending;
self.state_changed.notify_all();
Ok(())
} else if state.status == ComputeStatus::Init {
// If the compute is in Init state, we can't refresh the configuration immediately,
// but we should be able to do that soon.
Ok(())
} else {
Err(anyhow::anyhow!(
"Cannot refresh compute configuration in state {:?}",
state.status
))
}
}
// Wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop.
#[instrument(skip_all)]
@@ -1805,6 +2037,7 @@ impl ComputeNode {
#[instrument(skip_all)]
pub fn reconfigure(&self) -> Result<()> {
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
let databricks_settings = spec.databricks_settings.as_ref();
let tls_config = self.tls_config(&spec);
@@ -1854,6 +2087,21 @@ impl ComputeNode {
tls_config,
)?;
let postgres_port = self
.params
.connstr
.port()
.expect("port must be present in connstr");
config::write_postgres_conf(
pgdata_path,
&spec,
postgres_port,
self.params.internal_http_port,
&self.compute_ctl_config.tls,
databricks_settings,
)?;
self.pg_reload_conf()?;
if !spec.skip_pg_catalog_updates {
@@ -1960,6 +2208,15 @@ impl ComputeNode {
| ComputeStatus::Empty => {
state = self.state_changed.wait(state).unwrap();
}
// BEGIN HADRON
// Hadron does not currently use TLS in compute_ctl, but adding a wait
// here for completeness. The compute-initiated configuration
// refresh should be orthogonal to TLS certificate updates for now.
ComputeStatus::RefreshConfiguration
| ComputeStatus::RefreshConfigurationPending => {
state = self.state_changed.wait(state).unwrap();
} // END HADRON"neon
}
}
drop(state);
@@ -2320,7 +2577,7 @@ LIMIT 100",
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
libs_vec = libs
.split(&[',', '\'', ' '])
.filter(|s| *s != "neon" && !s.is_empty())
.filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty())
.map(str::to_string)
.collect();
}
@@ -2339,7 +2596,7 @@ LIMIT 100",
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
preload_libs_vec = libs
.split(&[',', '\'', ' '])
.filter(|s| *s != "neon" && !s.is_empty())
.filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty())
.map(str::to_string)
.collect();
}
@@ -2416,6 +2673,34 @@ LIMIT 100",
}
}
/// Set the compute spec and update related metrics.
/// This is the central place where pspec is updated.
pub fn set_spec(params: &ComputeNodeParams, state: &mut ComputeState, pspec: ParsedSpec) {
state.pspec = Some(pspec);
ComputeNode::update_attached_metric(params, state);
let _ = logger::update_ids(&params.instance_id, &Some(params.compute_id.clone()));
}
pub fn update_attached_metric(params: &ComputeNodeParams, state: &mut ComputeState) {
// Update the pg_cctl_attached gauge when all identifiers are available.
if let Some(instance_id) = &params.instance_id {
if let Some(pspec) = &state.pspec {
// Clear all values in the metric
COMPUTE_ATTACHED.reset();
// Set new metric value
COMPUTE_ATTACHED
.with_label_values(&[
&params.compute_id,
instance_id,
&pspec.tenant_id.to_string(),
&pspec.timeline_id.to_string(),
])
.set(1);
}
}
}
pub fn spawn_extension_stats_task(&self) {
self.terminate_extension_stats_task();

View File

@@ -0,0 +1,60 @@
use metrics::{
IntCounter, IntGaugeVec, core::Collector, proto::MetricFamily, register_int_counter,
register_int_gauge_vec,
};
use once_cell::sync::Lazy;
// Counter keeping track of the number of PageStream request errors reported by Postgres.
// An error is registered every time Postgres calls compute_ctl's /refresh_configuration API.
// Postgres will invoke this API if it detected trouble with PageStream requests (get_page@lsn,
// get_base_backup, etc.) it sends to any pageserver. An increase in this counter value typically
// indicates Postgres downtime, as PageStream requests are critical for Postgres to function.
pub static POSTGRES_PAGESTREAM_REQUEST_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pg_cctl_pagestream_request_errors_total",
"Number of PageStream request errors reported by the postgres process"
)
.expect("failed to define a metric")
});
// Counter keeping track of the number of compute configuration errors due to Postgres statement
// timeouts. An error is registered every time `ComputeNode::reconfigure()` fails due to Postgres
// error code 57014 (query cancelled). This statement timeout typically occurs when postgres is
// stuck in a problematic retry loop when the PS is reject its connection requests (usually due
// to PG pointing at the wrong PS). We should investigate the root cause when this counter value
// increases by checking PG and PS logs.
pub static COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pg_cctl_configure_statement_timeout_errors_total",
"Number of compute configuration errors due to Postgres statement timeouts."
)
.expect("failed to define a metric")
});
pub static COMPUTE_ATTACHED: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pg_cctl_attached",
"Compute node attached status (1 if attached)",
&[
"pg_compute_id",
"pg_instance_id",
"tenant_id",
"timeline_id"
]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = Vec::new();
metrics.extend(POSTGRES_PAGESTREAM_REQUEST_ERRORS.collect());
metrics.extend(COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.collect());
metrics.extend(COMPUTE_ATTACHED.collect());
metrics
}
pub fn initialize_metrics() {
Lazy::force(&POSTGRES_PAGESTREAM_REQUEST_ERRORS);
Lazy::force(&COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS);
Lazy::force(&COMPUTE_ATTACHED);
}

View File

@@ -16,6 +16,7 @@ pub mod compute_prewarm;
pub mod compute_promote;
pub mod disk_quota;
pub mod extension_server;
pub mod hadron_metrics;
pub mod installed_extensions;
pub mod local_proxy;
pub mod lsn_lease;