mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 20:40:37 +00:00
Compare commits
8 Commits
diko/safek
...
thesuhas/m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
657204b474 | ||
|
|
daf8c70bec | ||
|
|
bae49607df | ||
|
|
c11a3f7ee7 | ||
|
|
32861cb821 | ||
|
|
b1f53b82a7 | ||
|
|
104780f225 | ||
|
|
3e45f947cb |
@@ -1,10 +1,13 @@
|
||||
use anyhow::{Context, Result};
|
||||
use std::ffi::OsString;
|
||||
use chrono::{DateTime, Utc};
|
||||
use compute_api::privilege::Privilege;
|
||||
use compute_api::responses::{
|
||||
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
|
||||
LfcPrewarmState, PromoteState, TlsConfig,
|
||||
};
|
||||
use compute_api::spec::GenericOption;
|
||||
use compute_api::spec::Role;
|
||||
use compute_api::spec::{
|
||||
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent,
|
||||
};
|
||||
@@ -40,7 +43,10 @@ use utils::shard::{ShardCount, ShardIndex, ShardNumber};
|
||||
|
||||
use crate::configurator::launch_configurator;
|
||||
use crate::disk_quota::set_disk_quota;
|
||||
|
||||
use crate::hadron_metrics::COMPUTE_ATTACHED;
|
||||
use crate::installed_extensions::get_installed_extensions;
|
||||
use crate::logger;
|
||||
use crate::logger::startup_context_from_env;
|
||||
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
|
||||
use crate::metrics::COMPUTE_CTL_UP;
|
||||
@@ -118,6 +124,20 @@ pub struct ComputeNodeParams {
|
||||
pub pg_init_timeout: Option<Duration>,
|
||||
|
||||
pub lakebase_mode: bool,
|
||||
|
||||
// Hadron-added params
|
||||
// Path to the compute node spec.json file on the local file system containing the compute
|
||||
// spec, if specified. This is only used in tests.
|
||||
pub spec_path_test_only: Option<OsString>,
|
||||
// Control plane URI and compute ID used to refresh compute spec after the compute node
|
||||
// detects PS/tenant assignment changes. Passed through from the CLI.
|
||||
pub control_plane_uri: Option<String>,
|
||||
/// Hadron instance ID of the compute node.
|
||||
pub instance_id: Option<String>,
|
||||
/// Timeout of PG compute startup in the Init state.
|
||||
pub pg_init_timeout: Option<Duration>,
|
||||
// Path to the `pg_isready` binary.
|
||||
pub pg_isready_bin: String,
|
||||
}
|
||||
|
||||
type TaskHandle = Mutex<Option<JoinHandle<()>>>;
|
||||
@@ -405,6 +425,124 @@ struct StartVmMonitorResult {
|
||||
vm_monitor: Option<JoinHandle<Result<()>>>,
|
||||
}
|
||||
|
||||
|
||||
// BEGIN_HADRON
|
||||
/// This function creates roles that are used by Databricks.
|
||||
/// These roles are not needs to be botostrapped at PG Compute provisioning time.
|
||||
/// The auth method for these roles are configured in databricks_pg_hba.conf in universe repository.
|
||||
pub(crate) fn create_databricks_roles() -> Vec<String> {
|
||||
let roles = vec![
|
||||
// Role for prometheus_stats_exporter
|
||||
Role {
|
||||
name: "databricks_monitor".to_string(),
|
||||
// This uses "local" connection and auth method for that is "trust", so no password is needed.
|
||||
encrypted_password: None,
|
||||
options: Some(vec![GenericOption {
|
||||
name: "IN ROLE pg_monitor".to_string(),
|
||||
value: None,
|
||||
vartype: "string".to_string(),
|
||||
}]),
|
||||
},
|
||||
// Role for brickstore control plane
|
||||
Role {
|
||||
name: "databricks_control_plane".to_string(),
|
||||
// Certificate user does not need password.
|
||||
encrypted_password: None,
|
||||
options: Some(vec![GenericOption {
|
||||
name: "SUPERUSER".to_string(),
|
||||
value: None,
|
||||
vartype: "string".to_string(),
|
||||
}]),
|
||||
},
|
||||
// Role for brickstore httpgateway.
|
||||
Role {
|
||||
name: "databricks_gateway".to_string(),
|
||||
// Certificate user does not need password.
|
||||
encrypted_password: None,
|
||||
options: None,
|
||||
},
|
||||
];
|
||||
|
||||
roles
|
||||
.into_iter()
|
||||
.map(|role| {
|
||||
let query = format!(
|
||||
r#"
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT FROM pg_catalog.pg_roles WHERE rolname = '{}')
|
||||
THEN
|
||||
CREATE ROLE {} {};
|
||||
END IF;
|
||||
END
|
||||
$$;"#,
|
||||
role.name,
|
||||
role.name.pg_quote(),
|
||||
role.to_pg_options(),
|
||||
);
|
||||
query
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Databricks-specific environment variables to be passed to the `postgres` sub-process.
|
||||
pub struct DatabricksEnvVars {
|
||||
/// The Databricks "endpoint ID" of the compute instance. Used by `postgres` to check
|
||||
/// the token scopes of internal auth tokens.
|
||||
pub endpoint_id: String,
|
||||
/// Hostname of the Databricks workspace URL this compute instance belongs to.
|
||||
/// Used by postgres to verify Databricks PAT tokens.
|
||||
pub workspace_host: String,
|
||||
}
|
||||
|
||||
impl DatabricksEnvVars {
|
||||
pub fn new(
|
||||
compute_spec: &ComputeSpec,
|
||||
compute_id: Option<String>,
|
||||
instance_id: Option<String>,
|
||||
) -> Self {
|
||||
let endpoint_id = if let Some(instance_id) = instance_id {
|
||||
// Use instance_id as endpoint_id if it is set. This code path is for PuPr model.
|
||||
instance_id
|
||||
} else {
|
||||
// Use compute_id as endpoint_id if instance_id is not set. The code path is for PrPr model.
|
||||
// compute_id is a string format of "{endpoint_id}/{compute_idx}"
|
||||
// endpoint_id is a uuid. We only need to pass down endpoint_id to postgres.
|
||||
// Panics if compute_id is not set or not in the expected format.
|
||||
compute_id.unwrap().split('/').next().unwrap().to_string()
|
||||
};
|
||||
let workspace_host = compute_spec
|
||||
.databricks_settings
|
||||
.as_ref()
|
||||
.map(|s| s.databricks_workspace_host.clone())
|
||||
.unwrap_or("".to_string());
|
||||
Self {
|
||||
endpoint_id,
|
||||
workspace_host,
|
||||
}
|
||||
}
|
||||
|
||||
/// Constants for the names of Databricks-specific postgres environment variables.
|
||||
const DATABRICKS_ENDPOINT_ID_ENVVAR: &'static str = "DATABRICKS_ENDPOINT_ID";
|
||||
const DATABRICKS_WORKSPACE_HOST_ENVVAR: &'static str = "DATABRICKS_WORKSPACE_HOST";
|
||||
|
||||
/// Convert DatabricksEnvVars to a list of string pairs that can be passed as env vars. Consumes `self`.
|
||||
pub fn to_env_var_list(self) -> Vec<(String, String)> {
|
||||
vec![
|
||||
(
|
||||
Self::DATABRICKS_ENDPOINT_ID_ENVVAR.to_string(),
|
||||
self.endpoint_id.clone(),
|
||||
),
|
||||
(
|
||||
Self::DATABRICKS_WORKSPACE_HOST_ENVVAR.to_string(),
|
||||
self.workspace_host.clone(),
|
||||
),
|
||||
]
|
||||
}
|
||||
}
|
||||
// END_HADRON
|
||||
|
||||
impl ComputeNode {
|
||||
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
|
||||
let connstr = params.connstr.as_str();
|
||||
@@ -511,7 +649,15 @@ impl ComputeNode {
|
||||
let mut vm_monitor = None;
|
||||
let mut pg_process: Option<PostgresHandle> = None;
|
||||
|
||||
match this.start_compute(&mut pg_process) {
|
||||
// BEGIN HADRON
|
||||
let compute_id = this.params.compute_id.clone();
|
||||
let instance_id: Option<String> = std::env::var("INSTANCE_ID").ok();
|
||||
let hadron_mode = instance_id.is_some();
|
||||
let databricks_env_vars =
|
||||
DatabricksEnvVars::new(&pspec.spec, Some(compute_id), instance_id);
|
||||
// END HADRON
|
||||
|
||||
match this.start_compute(&mut pg_process, databricks_env_vars) {
|
||||
Ok(()) => {
|
||||
// Success! Launch remaining services (just vm-monitor currently)
|
||||
vm_monitor =
|
||||
@@ -565,6 +711,12 @@ impl ComputeNode {
|
||||
// Reap the postgres process
|
||||
delay_exit |= this.cleanup_after_postgres_exit()?;
|
||||
|
||||
let mut delay_secs = 30;
|
||||
// Shorten delay exit when compute ctl is running in Hadron
|
||||
if hadron_mode {
|
||||
delay_secs = 2;
|
||||
}
|
||||
|
||||
// /terminate returns LSN. If we don't sleep at all, connection will break and we
|
||||
// won't get result. If we sleep too much, tests will take significantly longer
|
||||
// and Github Action run will error out
|
||||
@@ -577,7 +729,8 @@ impl ComputeNode {
|
||||
// If launch failed, keep serving HTTP requests for a while, so the cloud
|
||||
// control plane can get the actual error.
|
||||
if delay_exit {
|
||||
info!("giving control plane 30s to collect the error before shutdown");
|
||||
info!("giving control plane {delay_secs}s to collect the error before shutdown");
|
||||
std::thread::sleep(Duration::from_secs(delay_secs));
|
||||
}
|
||||
std::thread::sleep(sleep_duration);
|
||||
Ok(exit_code)
|
||||
@@ -624,7 +777,11 @@ impl ComputeNode {
|
||||
///
|
||||
/// Note that this is in the critical path of a compute cold start. Keep this fast.
|
||||
/// Try to do things concurrently, to hide the latencies.
|
||||
fn start_compute(self: &Arc<Self>, pg_handle: &mut Option<PostgresHandle>) -> Result<()> {
|
||||
fn start_compute(
|
||||
self: &Arc<Self>,
|
||||
pg_handle: &mut Option<PostgresHandle>,
|
||||
databricks_env_vars: DatabricksEnvVars,
|
||||
) -> Result<()> {
|
||||
let compute_state: ComputeState;
|
||||
|
||||
let start_compute_span;
|
||||
@@ -833,7 +990,8 @@ impl ComputeNode {
|
||||
|
||||
////// START POSTGRES
|
||||
let start_time = Utc::now();
|
||||
let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
|
||||
let pg_process =
|
||||
self.start_postgres(pspec.storage_auth_token.clone(), databricks_env_vars)?;
|
||||
let postmaster_pid = pg_process.pid();
|
||||
*pg_handle = Some(pg_process);
|
||||
|
||||
@@ -1038,7 +1196,9 @@ impl ComputeNode {
|
||||
// If it is something different then create_dir() will error out anyway.
|
||||
let pgdata = &self.params.pgdata;
|
||||
let _ok = fs::remove_dir_all(pgdata);
|
||||
fs::create_dir(pgdata)?;
|
||||
// Ignore creation errors if the directory already exists (e.g. mounting it ahead of time).
|
||||
// If it is something different then PG startup will error out anyway.
|
||||
let _ok = fs::create_dir(pgdata);
|
||||
fs::set_permissions(pgdata, fs::Permissions::from_mode(0o700))?;
|
||||
|
||||
Ok(())
|
||||
@@ -1400,6 +1560,13 @@ impl ComputeNode {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
let databricks_settings = spec.databricks_settings.as_ref();
|
||||
|
||||
let postgres_port = self
|
||||
.params
|
||||
.connstr
|
||||
.port()
|
||||
.expect("port must be present in connstr");
|
||||
|
||||
let tls_config = self.tls_config(&pspec.spec);
|
||||
|
||||
@@ -1409,8 +1576,10 @@ impl ComputeNode {
|
||||
pgdata_path,
|
||||
&self.params,
|
||||
&pspec.spec,
|
||||
postgres_port,
|
||||
self.params.internal_http_port,
|
||||
tls_config,
|
||||
databricks_settings,
|
||||
)?;
|
||||
|
||||
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||
@@ -1453,6 +1622,21 @@ impl ComputeNode {
|
||||
// Update pg_hba.conf received with basebackup.
|
||||
update_pg_hba(pgdata_path, None)?;
|
||||
|
||||
if let Some(settings) = databricks_settings {
|
||||
copy_tls_certificates(
|
||||
&settings.pg_compute_tls_settings.key_file,
|
||||
&settings.pg_compute_tls_settings.cert_file,
|
||||
pgdata_path,
|
||||
)?;
|
||||
|
||||
// Update pg_hba.conf received with basebackup including additional databricks settings.
|
||||
update_pg_hba(pgdata_path, Some(&settings.databricks_pg_hba))?;
|
||||
update_pg_ident(pgdata_path, Some(&settings.databricks_pg_ident))?;
|
||||
} else {
|
||||
// Update pg_hba.conf received with basebackup.
|
||||
update_pg_hba(pgdata_path, None)?;
|
||||
}
|
||||
|
||||
// Place pg_dynshmem under /dev/shm. This allows us to use
|
||||
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
|
||||
// /dev/shm, similar to how 'dynamic_shared_memory_type = posix' works.
|
||||
@@ -1507,6 +1691,13 @@ impl ComputeNode {
|
||||
|
||||
/// Start and stop a postgres process to warm up the VM for startup.
|
||||
pub fn prewarm_postgres_vm_memory(&self) -> Result<()> {
|
||||
if self.params.instance_id.is_some() {
|
||||
// We are running in Hadron mode. Disabling this prewarming step for now as it could run
|
||||
// into dblet port conflicts and also doesn't add much value with our current infra.
|
||||
info!("Skipping postgres prewarming in Hadron mode");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!("prewarming VM memory");
|
||||
|
||||
// Create pgdata
|
||||
@@ -1561,17 +1752,25 @@ impl ComputeNode {
|
||||
///
|
||||
/// Returns a handle to the child process and a handle to the logs thread.
|
||||
#[instrument(skip_all)]
|
||||
pub fn start_postgres(&self, storage_auth_token: Option<String>) -> Result<PostgresHandle> {
|
||||
pub fn start_postgres(
|
||||
&self,
|
||||
storage_auth_token: Option<String>,
|
||||
databricks_env_vars: DatabricksEnvVars,
|
||||
) -> Result<PostgresHandle> {
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
|
||||
info!(
|
||||
"Starting Postgres for databricks endpoint id: {}",
|
||||
&databricks_env_vars.endpoint_id
|
||||
);
|
||||
|
||||
let mut env_vars = databricks_env_vars.to_env_var_list();
|
||||
env_vars.extend(storage_auth_token.map(|t| ("NEON_AUTH_TOKEN".to_string(), t)));
|
||||
|
||||
// Run postgres as a child process.
|
||||
let mut pg = maybe_cgexec(&self.params.pgbin)
|
||||
let mut pg: std::process::Child = maybe_cgexec(&self.params.pgbin)
|
||||
.args(["-D", &self.params.pgdata])
|
||||
.envs(if let Some(storage_auth_token) = &storage_auth_token {
|
||||
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
|
||||
} else {
|
||||
vec![]
|
||||
})
|
||||
.envs(env_vars)
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.expect("cannot start postgres process");
|
||||
@@ -1723,7 +1922,12 @@ impl ComputeNode {
|
||||
/// Do initial configuration of the already started Postgres.
|
||||
#[instrument(skip_all)]
|
||||
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
|
||||
let mut conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
|
||||
// Set a 2-minute statement_timeout for the session applying config. The individual SQL statements
|
||||
// used in apply_spec_sql() should not take long (they are just creating users and installing
|
||||
// extensions). If any of them are stuck for an extended period of time it usually indicates a
|
||||
// pageserver connectivity problem and we should bail out.
|
||||
conf.options("-c statement_timeout=2min");
|
||||
|
||||
let conf = Arc::new(conf);
|
||||
let spec = Arc::new(
|
||||
@@ -1756,7 +1960,6 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
// Run migrations separately to not hold up cold starts
|
||||
let lakebase_mode = self.params.lakebase_mode;
|
||||
let params = self.params.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut conf = conf.as_ref().clone();
|
||||
@@ -1769,7 +1972,7 @@ impl ComputeNode {
|
||||
eprintln!("connection error: {e}");
|
||||
}
|
||||
});
|
||||
if let Err(e) = handle_migrations(params, &mut client, lakebase_mode).await {
|
||||
if let Err(e) = handle_migrations(params, &mut client).await {
|
||||
error!("Failed to run migrations: {}", e);
|
||||
}
|
||||
}
|
||||
@@ -1785,6 +1988,35 @@ impl ComputeNode {
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
|
||||
|
||||
// Signal to the configurator to refresh the configuration by pulling a new spec from the HCC.
|
||||
// Note that this merely triggers a notification on a condition variable the configurator thread
|
||||
// waits on. The configurator thread (in configurator.rs) pulls the new spec from the HCC and
|
||||
// applies it.
|
||||
pub async fn signal_refresh_configuration(&self) -> Result<()> {
|
||||
let states_allowing_configuration_refresh = [
|
||||
ComputeStatus::Running,
|
||||
ComputeStatus::Failed,
|
||||
ComputeStatus::RefreshConfigurationPending,
|
||||
];
|
||||
|
||||
let mut state = self.state.lock().expect("state lock poisoned");
|
||||
if states_allowing_configuration_refresh.contains(&state.status) {
|
||||
state.status = ComputeStatus::RefreshConfigurationPending;
|
||||
self.state_changed.notify_all();
|
||||
Ok(())
|
||||
} else if state.status == ComputeStatus::Init {
|
||||
// If the compute is in Init state, we can't refresh the configuration immediately,
|
||||
// but we should be able to do that soon.
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow::anyhow!(
|
||||
"Cannot refresh compute configuration in state {:?}",
|
||||
state.status
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
// Wrapped this around `pg_ctl reload`, but right now we don't use
|
||||
// `pg_ctl` for start / stop.
|
||||
#[instrument(skip_all)]
|
||||
@@ -1805,6 +2037,7 @@ impl ComputeNode {
|
||||
#[instrument(skip_all)]
|
||||
pub fn reconfigure(&self) -> Result<()> {
|
||||
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
|
||||
let databricks_settings = spec.databricks_settings.as_ref();
|
||||
|
||||
let tls_config = self.tls_config(&spec);
|
||||
|
||||
@@ -1854,6 +2087,21 @@ impl ComputeNode {
|
||||
tls_config,
|
||||
)?;
|
||||
|
||||
|
||||
let postgres_port = self
|
||||
.params
|
||||
.connstr
|
||||
.port()
|
||||
.expect("port must be present in connstr");
|
||||
config::write_postgres_conf(
|
||||
pgdata_path,
|
||||
&spec,
|
||||
postgres_port,
|
||||
self.params.internal_http_port,
|
||||
&self.compute_ctl_config.tls,
|
||||
databricks_settings,
|
||||
)?;
|
||||
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
if !spec.skip_pg_catalog_updates {
|
||||
@@ -1960,6 +2208,15 @@ impl ComputeNode {
|
||||
| ComputeStatus::Empty => {
|
||||
state = self.state_changed.wait(state).unwrap();
|
||||
}
|
||||
|
||||
// BEGIN HADRON
|
||||
// Hadron does not currently use TLS in compute_ctl, but adding a wait
|
||||
// here for completeness. The compute-initiated configuration
|
||||
// refresh should be orthogonal to TLS certificate updates for now.
|
||||
ComputeStatus::RefreshConfiguration
|
||||
| ComputeStatus::RefreshConfigurationPending => {
|
||||
state = self.state_changed.wait(state).unwrap();
|
||||
} // END HADRON"neon
|
||||
}
|
||||
}
|
||||
drop(state);
|
||||
@@ -2320,7 +2577,7 @@ LIMIT 100",
|
||||
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
||||
libs_vec = libs
|
||||
.split(&[',', '\'', ' '])
|
||||
.filter(|s| *s != "neon" && !s.is_empty())
|
||||
.filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty())
|
||||
.map(str::to_string)
|
||||
.collect();
|
||||
}
|
||||
@@ -2339,7 +2596,7 @@ LIMIT 100",
|
||||
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
|
||||
preload_libs_vec = libs
|
||||
.split(&[',', '\'', ' '])
|
||||
.filter(|s| *s != "neon" && !s.is_empty())
|
||||
.filter(|s| *s != "neon" && *s != "databricks_auth" && !s.is_empty())
|
||||
.map(str::to_string)
|
||||
.collect();
|
||||
}
|
||||
@@ -2416,6 +2673,34 @@ LIMIT 100",
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the compute spec and update related metrics.
|
||||
/// This is the central place where pspec is updated.
|
||||
pub fn set_spec(params: &ComputeNodeParams, state: &mut ComputeState, pspec: ParsedSpec) {
|
||||
state.pspec = Some(pspec);
|
||||
ComputeNode::update_attached_metric(params, state);
|
||||
let _ = logger::update_ids(¶ms.instance_id, &Some(params.compute_id.clone()));
|
||||
}
|
||||
|
||||
pub fn update_attached_metric(params: &ComputeNodeParams, state: &mut ComputeState) {
|
||||
// Update the pg_cctl_attached gauge when all identifiers are available.
|
||||
if let Some(instance_id) = ¶ms.instance_id {
|
||||
if let Some(pspec) = &state.pspec {
|
||||
// Clear all values in the metric
|
||||
COMPUTE_ATTACHED.reset();
|
||||
|
||||
// Set new metric value
|
||||
COMPUTE_ATTACHED
|
||||
.with_label_values(&[
|
||||
¶ms.compute_id,
|
||||
instance_id,
|
||||
&pspec.tenant_id.to_string(),
|
||||
&pspec.timeline_id.to_string(),
|
||||
])
|
||||
.set(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_extension_stats_task(&self) {
|
||||
self.terminate_extension_stats_task();
|
||||
|
||||
|
||||
60
compute_tools/src/hadron_metrics.rs
Normal file
60
compute_tools/src/hadron_metrics.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
use metrics::{
|
||||
IntCounter, IntGaugeVec, core::Collector, proto::MetricFamily, register_int_counter,
|
||||
register_int_gauge_vec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
// Counter keeping track of the number of PageStream request errors reported by Postgres.
|
||||
// An error is registered every time Postgres calls compute_ctl's /refresh_configuration API.
|
||||
// Postgres will invoke this API if it detected trouble with PageStream requests (get_page@lsn,
|
||||
// get_base_backup, etc.) it sends to any pageserver. An increase in this counter value typically
|
||||
// indicates Postgres downtime, as PageStream requests are critical for Postgres to function.
|
||||
pub static POSTGRES_PAGESTREAM_REQUEST_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pg_cctl_pagestream_request_errors_total",
|
||||
"Number of PageStream request errors reported by the postgres process"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
// Counter keeping track of the number of compute configuration errors due to Postgres statement
|
||||
// timeouts. An error is registered every time `ComputeNode::reconfigure()` fails due to Postgres
|
||||
// error code 57014 (query cancelled). This statement timeout typically occurs when postgres is
|
||||
// stuck in a problematic retry loop when the PS is reject its connection requests (usually due
|
||||
// to PG pointing at the wrong PS). We should investigate the root cause when this counter value
|
||||
// increases by checking PG and PS logs.
|
||||
pub static COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pg_cctl_configure_statement_timeout_errors_total",
|
||||
"Number of compute configuration errors due to Postgres statement timeouts."
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static COMPUTE_ATTACHED: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pg_cctl_attached",
|
||||
"Compute node attached status (1 if attached)",
|
||||
&[
|
||||
"pg_compute_id",
|
||||
"pg_instance_id",
|
||||
"tenant_id",
|
||||
"timeline_id"
|
||||
]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub fn collect() -> Vec<MetricFamily> {
|
||||
let mut metrics = Vec::new();
|
||||
metrics.extend(POSTGRES_PAGESTREAM_REQUEST_ERRORS.collect());
|
||||
metrics.extend(COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS.collect());
|
||||
metrics.extend(COMPUTE_ATTACHED.collect());
|
||||
metrics
|
||||
}
|
||||
|
||||
pub fn initialize_metrics() {
|
||||
Lazy::force(&POSTGRES_PAGESTREAM_REQUEST_ERRORS);
|
||||
Lazy::force(&COMPUTE_CONFIGURE_STATEMENT_TIMEOUT_ERRORS);
|
||||
Lazy::force(&COMPUTE_ATTACHED);
|
||||
}
|
||||
@@ -16,6 +16,7 @@ pub mod compute_prewarm;
|
||||
pub mod compute_promote;
|
||||
pub mod disk_quota;
|
||||
pub mod extension_server;
|
||||
pub mod hadron_metrics;
|
||||
pub mod installed_extensions;
|
||||
pub mod local_proxy;
|
||||
pub mod lsn_lease;
|
||||
|
||||
Reference in New Issue
Block a user