Compare commits

..

12 Commits

Author SHA1 Message Date
Conrad Ludgate
cc66f78d01 update readme 2025-07-31 11:51:44 +01:00
Conrad Ludgate
f9e6802974 s/ssl/tls 2025-07-30 14:03:22 +01:00
Conrad Ludgate
74afc9d96f refactor pgbouncer tuning 2025-07-30 12:34:36 +01:00
Conrad Ludgate
86fe3150f0 add basic tls test 2025-07-30 12:34:31 +01:00
Conrad Ludgate
52be0146d3 fix runtime 2025-07-30 12:32:23 +01:00
Conrad Ludgate
a3f2a2cae5 add fast path for TLS renewal configuration 2025-07-30 12:29:41 +01:00
Conrad Ludgate
a24a0032ad update certificate files in the watch task 2025-07-30 11:47:34 +01:00
Conrad Ludgate
70cb02742a pass in the tls_config as a param to watch_certs_for_changes, also wait for it to complete before configuring pgbouncer/local_proxy 2025-07-30 11:47:07 +01:00
Conrad Ludgate
a845295cb3 refactor TLS processing. Only use blocking-IO, split out the loading of certificates from the updating of certificates 2025-07-30 10:29:03 +01:00
Conrad Ludgate
e288cd2198 fix concurrent reconfigure while TLS configuration is taking place 2025-07-30 10:14:20 +01:00
Conrad Ludgate
ffa9e595b8 introduce separate reload commands 2025-07-30 10:14:17 +01:00
Conrad Ludgate
e7b1f63f68 add logs for TLS 2025-07-30 10:08:04 +01:00
103 changed files with 691 additions and 1699 deletions

View File

@@ -1 +1 @@
SELECT num_requested AS checkpoints_req FROM pg_catalog.pg_stat_checkpointer;
SELECT num_requested AS checkpoints_req FROM pg_stat_checkpointer;

View File

@@ -1 +1 @@
SELECT checkpoints_req FROM pg_catalog.pg_stat_bgwriter;
SELECT checkpoints_req FROM pg_stat_bgwriter;

View File

@@ -1 +1 @@
SELECT checkpoints_timed FROM pg_catalog.pg_stat_bgwriter;
SELECT checkpoints_timed FROM pg_stat_bgwriter;

View File

@@ -1 +1 @@
SELECT (neon.backpressure_throttling_time()::pg_catalog.float8 / 1000000) AS throttled;
SELECT (neon.backpressure_throttling_time()::float8 / 1000000) AS throttled;

View File

@@ -1,4 +1,4 @@
SELECT CASE
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_catalog.pg_last_wal_replay_lsn() - '0/0')::pg_catalog.FLOAT8
ELSE (pg_catalog.pg_current_wal_lsn() - '0/0')::pg_catalog.FLOAT8
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_last_wal_replay_lsn() - '0/0')::FLOAT8
ELSE (pg_current_wal_lsn() - '0/0')::FLOAT8
END AS lsn;

View File

@@ -1,7 +1,7 @@
SELECT
(SELECT setting FROM pg_catalog.pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COUNT(*) FROM pg_catalog.pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
(SELECT COUNT(*) FROM pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;

View File

@@ -1,7 +1,7 @@
SELECT
(SELECT pg_catalog.current_setting('neon.timeline_id')) AS timeline_id,
(SELECT current_setting('neon.timeline_id')) AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COALESCE(pg_catalog.sum(size), 0) FROM pg_catalog.pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;
(SELECT COALESCE(sum(size), 0) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;

View File

@@ -1,9 +1,9 @@
SELECT
(SELECT setting FROM pg_catalog.pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COALESCE(pg_catalog.sum((pg_catalog.pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
FROM (SELECT * FROM pg_catalog.pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
(SELECT COALESCE(sum((pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
FROM (SELECT * FROM pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
) AS logical_snapshots_bytes;

View File

@@ -1 +1 @@
SELECT pg_catalog.current_setting('max_connections') AS max_connections;
SELECT current_setting('max_connections') as max_connections;

View File

@@ -1,4 +1,4 @@
SELECT datname database_name,
pg_catalog.age(datfrozenxid) frozen_xid_age
FROM pg_catalog.pg_database
age(datfrozenxid) frozen_xid_age
FROM pg_database
ORDER BY frozen_xid_age DESC LIMIT 10;

View File

@@ -1,4 +1,4 @@
SELECT datname database_name,
pg_catalog.mxid_age(datminmxid) min_mxid_age
FROM pg_catalog.pg_database
mxid_age(datminmxid) min_mxid_age
FROM pg_database
ORDER BY min_mxid_age DESC LIMIT 10;

View File

@@ -1,4 +1,4 @@
SELECT CASE
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_catalog.pg_last_wal_receive_lsn() - '0/0')::pg_catalog.FLOAT8
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_last_wal_receive_lsn() - '0/0')::FLOAT8
ELSE 0
END AS lsn;

View File

@@ -1 +1 @@
SELECT subenabled::pg_catalog.text AS enabled, pg_catalog.count(*) AS subscriptions_count FROM pg_catalog.pg_subscription GROUP BY subenabled;
SELECT subenabled::text AS enabled, count(*) AS subscriptions_count FROM pg_subscription GROUP BY subenabled;

View File

@@ -1 +1 @@
SELECT datname, state, pg_catalog.count(*) AS count FROM pg_catalog.pg_stat_activity WHERE state <> '' GROUP BY datname, state;
SELECT datname, state, count(*) AS count FROM pg_stat_activity WHERE state <> '' GROUP BY datname, state;

View File

@@ -1,5 +1,5 @@
SELECT pg_catalog.sum(pg_catalog.pg_database_size(datname)) AS total
FROM pg_catalog.pg_database
SELECT sum(pg_database_size(datname)) AS total
FROM pg_database
-- Ignore invalid databases, as we will likely have problems with
-- getting their size from the Pageserver.
WHERE datconnlimit != -2;

View File

@@ -3,6 +3,6 @@
-- minutes.
SELECT
x::pg_catalog.text AS duration_seconds,
x::text as duration_seconds,
neon.approximate_working_set_size_seconds(x) AS size
FROM (SELECT generate_series * 60 AS x FROM generate_series(1, 60)) AS t (x);

View File

@@ -3,6 +3,6 @@
SELECT
x AS duration,
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::pg_catalog.interval)::pg_catalog.int4) AS size FROM (
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::interval)::int) AS size FROM (
VALUES ('5m'), ('15m'), ('1h')
) AS t (x);

View File

@@ -1 +1 @@
SELECT pg_catalog.pg_size_bytes(pg_catalog.current_setting('neon.file_cache_size_limit')) AS lfc_cache_size_limit;
SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit')) AS lfc_cache_size_limit;

View File

@@ -1,3 +1,3 @@
SELECT slot_name, (restart_lsn - '0/0')::pg_catalog.FLOAT8 AS restart_lsn
FROM pg_catalog.pg_replication_slots
SELECT slot_name, (restart_lsn - '0/0')::FLOAT8 as restart_lsn
FROM pg_replication_slots
WHERE slot_type = 'logical';

View File

@@ -1 +1 @@
SELECT setting::pg_catalog.int4 AS max_cluster_size FROM pg_catalog.pg_settings WHERE name = 'neon.max_cluster_size';
SELECT setting::int AS max_cluster_size FROM pg_settings WHERE name = 'neon.max_cluster_size';

View File

@@ -1,13 +1,13 @@
-- We export stats for 10 non-system databases. Without this limit it is too
-- easy to abuse the system by creating lots of databases.
SELECT pg_catalog.pg_database_size(datname) AS db_size,
SELECT pg_database_size(datname) AS db_size,
deadlocks,
tup_inserted AS inserted,
tup_updated AS updated,
tup_deleted AS deleted,
datname
FROM pg_catalog.pg_stat_database
FROM pg_stat_database
WHERE datname IN (
SELECT datname FROM pg_database
-- Ignore invalid databases, as we will likely have problems with

View File

@@ -3,4 +3,4 @@
-- replay LSN may have advanced past the receive LSN we are using for the
-- calculation.
SELECT GREATEST(0, pg_catalog.pg_wal_lsn_diff(pg_catalog.pg_last_wal_receive_lsn(), pg_catalog.pg_last_wal_replay_lsn())) AS replication_delay_bytes;
SELECT GREATEST(0, pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn())) AS replication_delay_bytes;

View File

@@ -1,5 +1,5 @@
SELECT
CASE
WHEN pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn() THEN 0
ELSE GREATEST(0, EXTRACT (EPOCH FROM pg_catalog.now() - pg_catalog.pg_last_xact_replay_timestamp()))
WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() THEN 0
ELSE GREATEST(0, EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()))
END AS replication_delay_seconds;

View File

@@ -1,10 +1,10 @@
SELECT
slot_name,
pg_catalog.pg_wal_lsn_diff(
pg_wal_lsn_diff(
CASE
WHEN pg_catalog.pg_is_in_recovery() THEN pg_catalog.pg_last_wal_replay_lsn()
ELSE pg_catalog.pg_current_wal_lsn()
WHEN pg_is_in_recovery() THEN pg_last_wal_replay_lsn()
ELSE pg_current_wal_lsn()
END,
restart_lsn)::pg_catalog.FLOAT8 AS retained_wal
FROM pg_catalog.pg_replication_slots
restart_lsn)::FLOAT8 AS retained_wal
FROM pg_replication_slots
WHERE active = false;

View File

@@ -4,4 +4,4 @@ SELECT
WHEN wal_status = 'lost' THEN 1
ELSE 0
END AS wal_is_lost
FROM pg_catalog.pg_replication_slots;
FROM pg_replication_slots;

View File

@@ -57,6 +57,9 @@ stateDiagram-v2
RefreshConfigurationPending --> RefreshConfiguration: Received compute spec and started configuration
RefreshConfiguration --> Running : Compute has been re-configured
RefreshConfiguration --> RefreshConfigurationPending : Configuration failed and to be retried
Running --> Reloading : Local changes (TLS certificate renewal) were detected and postgres is being reloaded
Reloading --> Running : Postgres was reloaded
Reloading --> Failed : Failed to reload postgres
TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status
TerminationPendingImmediate --> Terminated : Terminated compute immediately
Failed --> RefreshConfigurationPending : Received a /refresh_configuration request

View File

@@ -279,7 +279,7 @@ fn main() -> Result<()> {
config,
)?;
let exit_code = compute_node.run().context("running compute node")?;
let exit_code = compute_node.run()?;
scenario.teardown();

View File

@@ -24,9 +24,9 @@ pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
});
let query = "
INSERT INTO public.health_check VALUES (1, pg_catalog.now())
INSERT INTO health_check VALUES (1, now())
ON CONFLICT (id) DO UPDATE
SET updated_at = pg_catalog.now();";
SET updated_at = now();";
match client.simple_query(query).await {
Result::Ok(result) => {

View File

@@ -28,16 +28,12 @@ use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::{spawn, sync::watch, task::JoinHandle, time};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, instrument, warn};
use url::Url;
use utils::backoff::{
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff_duration,
};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
@@ -61,7 +57,6 @@ use crate::rsyslog::{
use crate::spec::*;
use crate::swap::resize_swap;
use crate::sync_sk::{check_if_synced, ping_safekeeper};
use crate::tls::watch_cert_for_changes;
use crate::{config, extension_server, local_proxy};
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
@@ -196,7 +191,6 @@ pub struct ComputeState {
pub startup_span: Option<tracing::span::Span>,
pub lfc_prewarm_state: LfcPrewarmState,
pub lfc_prewarm_token: CancellationToken,
pub lfc_offload_state: LfcOffloadState,
/// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if
@@ -222,7 +216,6 @@ impl ComputeState {
lfc_offload_state: LfcOffloadState::default(),
terminate_flush_lsn: None,
promote_state: None,
lfc_prewarm_token: CancellationToken::new(),
}
}
@@ -589,7 +582,7 @@ impl ComputeNode {
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
// Unset them via connection string options before connecting to the database.
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path='' -c statement_timeout=0 -c pgaudit.log=none";
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0 -c pgaudit.log=none";
let options = match conn_conf.get_options() {
// Allow the control plane to override any options set by the
// compute
@@ -848,14 +841,11 @@ impl ComputeNode {
let mut pre_tasks = tokio::task::JoinSet::new();
// Make sure TLS certificates are properly loaded and in the right place.
if self.compute_ctl_config.tls.is_some() {
let tls_task = self.compute_ctl_config.tls.as_ref().map(|tls_config| {
let this = self.clone();
pre_tasks.spawn(async move {
this.watch_cert_for_changes().await;
Ok::<(), anyhow::Error>(())
});
}
let tls_config = tls_config.clone();
tokio::task::spawn_blocking(|| this.watch_cert_for_changes(tls_config))
});
let tls_config = self.tls_config(&pspec.spec);
@@ -910,6 +900,13 @@ impl ComputeNode {
});
}
// Wait for TLS certificates to be issued before updating pgbouncer and local proxy.
let rt = tokio::runtime::Handle::current();
if let Some(tls_task) = tls_task {
rt.block_on(tls_task)
.context("TLS certificate renewal task panicked")?;
}
// tune pgbouncer
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
info!("tuning pgbouncer");
@@ -992,7 +989,6 @@ impl ComputeNode {
let _configurator_handle = launch_configurator(self);
// Wait for all the pre-tasks to finish before starting postgres
let rt = tokio::runtime::Handle::current();
while let Some(res) = rt.block_on(pre_tasks.join_next()) {
res??;
}
@@ -1560,41 +1556,6 @@ impl ComputeNode {
Ok(lsn)
}
fn sync_safekeepers_with_retries(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let max_retries = 5;
let mut attempts = 0;
loop {
let result = self.sync_safekeepers(storage_auth_token.clone());
match &result {
Ok(_) => {
if attempts > 0 {
tracing::info!("sync_safekeepers succeeded after {attempts} retries");
}
return result;
}
Err(e) if attempts < max_retries => {
tracing::info!(
"sync_safekeepers failed, will retry (attempt {attempts}): {e:#}"
);
}
Err(err) => {
tracing::warn!(
"sync_safekeepers still failed after {attempts} retries, giving up: {err:?}"
);
return result;
}
}
// sleep and retry
let backoff = exponential_backoff_duration(
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
);
std::thread::sleep(backoff);
attempts += 1;
}
}
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip_all)]
@@ -1630,7 +1591,7 @@ impl ComputeNode {
lsn
} else {
info!("starting safekeepers syncing");
self.sync_safekeepers_with_retries(pspec.storage_auth_token.clone())
self.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?
};
info!("safekeepers synced at LSN {}", lsn);
@@ -1925,7 +1886,7 @@ impl ComputeNode {
// It doesn't matter what were the options before, here we just want
// to connect and create a new superuser role.
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path='' -c statement_timeout=0";
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
zenith_admin_conf.options(ZENITH_OPTIONS);
let mut client =
@@ -1990,10 +1951,7 @@ impl ComputeNode {
.clone(),
);
let mut tls_config = None::<TlsConfig>;
if spec.features.contains(&ComputeFeature::TlsExperimental) {
tls_config = self.compute_ctl_config.tls.clone();
}
let tls_config = self.tls_config(&spec);
self.update_installed_extensions_collection_interval(&spec);
@@ -2175,6 +2133,60 @@ impl ComputeNode {
Ok(())
}
/// Tell postgres/pgbouncer/local_proxy to reload their configurations.
#[instrument(skip_all)]
pub fn reload(&self, spec: ComputeSpec) -> Result<()> {
let rt = tokio::runtime::Handle::current();
if spec.pgbouncer_settings.is_some() {
rt.block_on(reload_pgbouncer())?;
}
if spec.local_proxy_config.is_some() {
local_proxy::reload()?;
}
self.pg_reload_conf()?;
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!("finished reload of compute node for operation {op_id}");
Ok(())
}
/// Acquire the "reloading" lock while running the supplied function.
///
/// This ensures that this thread is the only thread that
/// can issue signals to postgres.
///
/// If the supplied function errors, the compute status is marked as failed.
pub fn lock_while_reloading<T>(
&self,
mut state: MutexGuard<'_, ComputeState>,
f: impl FnOnce(ComputeSpec) -> Result<T>,
) -> Result<T> {
let old_status = state.status;
// transition to the reloading state.
state.set_status(ComputeStatus::Reloading, &self.state_changed);
let spec = state.pspec.as_ref().unwrap().spec.clone();
// unlock while reloading, so we don't block other tasks.
drop(state);
let res = f(spec);
let new_status = if res.is_ok() {
old_status
} else {
ComputeStatus::Failed
};
let mut state = self.state.lock().unwrap();
// make sure our invariants are upheld
assert_eq!(state.status, ComputeStatus::Reloading);
state.set_status(new_status, &self.state_changed);
res
}
#[instrument(skip_all)]
pub fn configure_as_primary(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
@@ -2209,57 +2221,103 @@ impl ComputeNode {
Ok(())
}
pub async fn watch_cert_for_changes(self: Arc<Self>) {
// update status on cert renewal
if let Some(tls_config) = &self.compute_ctl_config.tls {
let tls_config = tls_config.clone();
pub fn watch_cert_for_changes(self: Arc<Self>, tls_config: TlsConfig) {
// wait until the cert exists.
let mut digest = crate::tls::compute_digest(&tls_config.cert_path);
info!(
cert_path = tls_config.cert_path,
key_path = tls_config.key_path,
"TLS certificates found"
);
// wait until the cert exists.
let mut cert_watch = watch_cert_for_changes(tls_config.cert_path.clone()).await;
// ensure the keys are saved before continuing.
let key_pair = crate::tls::load_certs_blocking(&tls_config);
while let Err(e) =
crate::tls::update_key_path_blocking(Path::new(&self.params.pgdata), &key_pair)
{
error!("could not save TLS certificates: {e}");
std::thread::sleep(Duration::from_millis(20));
}
tokio::task::spawn_blocking(move || {
let handle = tokio::runtime::Handle::current();
'cert_update: loop {
// let postgres/pgbouncer/local_proxy know the new cert/key exists.
// we need to wait until it's configurable first.
tokio::task::spawn_blocking(move || {
'cert_update: loop {
// wait for a new certificate update
let new_digest = crate::tls::wait_until_cert_changed(digest, &tls_config.cert_path);
let mut state = self.state.lock().unwrap();
'status_update: loop {
match state.status {
// let's update the state to config pending
ComputeStatus::ConfigurationPending | ComputeStatus::Running => {
state.set_status(
ComputeStatus::ConfigurationPending,
&self.state_changed,
);
break 'status_update;
}
// load the corresponding keys
let key_pair = crate::tls::load_certs_blocking(&tls_config);
// exit loop
ComputeStatus::Failed
| ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::Terminated => break 'cert_update,
// let postgres/pgbouncer/local_proxy know the new cert/key exists.
// we need to wait until it's configurable first.
// wait
ComputeStatus::Init
| ComputeStatus::Configuration
| ComputeStatus::RefreshConfiguration
| ComputeStatus::RefreshConfigurationPending
| ComputeStatus::Empty => {
state = self.state_changed.wait(state).unwrap();
}
let mut state = self.state.lock().unwrap();
'status_update: loop {
match state.status {
// let's update the state to config pending
ComputeStatus::Running => {
info!("reloading compute due to TLS certificate renewal");
break 'status_update;
}
// exit loop
ComputeStatus::Failed
| ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::Terminated => break 'cert_update,
// wait
ComputeStatus::Init
| ComputeStatus::Configuration
| ComputeStatus::ConfigurationPending
| ComputeStatus::RefreshConfiguration
| ComputeStatus::RefreshConfigurationPending
| ComputeStatus::Reloading
| ComputeStatus::Empty => {
state = self.state_changed.wait(state).unwrap();
}
}
drop(state);
}
// wait for a new certificate update
if handle.block_on(cert_watch.changed()).is_err() {
break;
let result = self.lock_while_reloading(state, |spec| {
// ensure the keys are saved before continuing.
// we do this while holding the 'reloading' state so that we know we're not interfering with any
// active configuration stages.
if let Err(e) = crate::tls::update_key_path_blocking(
Path::new(&self.params.pgdata),
&key_pair,
) {
return Ok(Err(e));
}
// reload postgres/pgbouncer/local_proxy to pick up our new certificates.
self.reload(spec)?;
Ok(Ok(()))
});
match result {
// Reload failed. Compute is in a bad state.
Err(e) => {
error!("could not reload compute node: {}", e);
return;
}
// Updating the certificates failed. Retry
Ok(Err(e)) => {
error!("could not save TLS certificates: {e}");
std::thread::sleep(Duration::from_millis(20));
}
// Successful. Acknowledge that we've saved these certificates.
Ok(Ok(())) => {
digest = new_digest;
info!(
cert_path = tls_config.cert_path,
key_path = tls_config.key_path,
"TLS certificates renewed",
);
}
}
});
}
}
});
}
pub fn tls_config(&self, spec: &ComputeSpec) -> &Option<TlsConfig> {
@@ -2380,13 +2438,13 @@ impl ComputeNode {
let result = client
.simple_query(
"SELECT
pg_catalog.row_to_json(pss)
row_to_json(pg_stat_statements)
FROM
public.pg_stat_statements pss
pg_stat_statements
WHERE
pss.userid != 'cloud_admin'::pg_catalog.regrole::pg_catalog.oid
userid != 'cloud_admin'::regrole::oid
ORDER BY
(pss.mean_exec_time + pss.mean_plan_time) DESC
(mean_exec_time + mean_plan_time) DESC
LIMIT 100",
)
.await;
@@ -2514,11 +2572,11 @@ LIMIT 100",
// check the role grants first - to gracefully handle read-replicas.
let select = "SELECT privilege_type
FROM pg_catalog.pg_namespace
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) AS acl ON true
JOIN pg_catalog.pg_user users ON acl.grantee = users.usesysid
WHERE users.usename OPERATOR(pg_catalog.=) $1::pg_catalog.name
AND nspname OPERATOR(pg_catalog.=) $2::pg_catalog.name";
FROM pg_namespace
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) acl ON true
JOIN pg_user users ON acl.grantee = users.usesysid
WHERE users.usename = $1
AND nspname = $2";
let rows = db_client
.query(select, &[role_name, schema_name])
.await
@@ -2587,9 +2645,8 @@ LIMIT 100",
.await
.with_context(|| format!("Failed to execute query: {query}"))?;
} else {
let query = format!(
"CREATE EXTENSION IF NOT EXISTS {ext_name} WITH SCHEMA public VERSION {quoted_version}"
);
let query =
format!("CREATE EXTENSION IF NOT EXISTS {ext_name} WITH VERSION {quoted_version}");
db_client
.simple_query(&query)
.await

View File

@@ -7,8 +7,7 @@ use http::StatusCode;
use reqwest::Client;
use std::mem::replace;
use std::sync::Arc;
use tokio::{io::AsyncReadExt, select, spawn};
use tokio_util::sync::CancellationToken;
use tokio::{io::AsyncReadExt, spawn};
use tracing::{error, info};
#[derive(serde::Serialize, Default)]
@@ -93,35 +92,34 @@ impl ComputeNode {
/// If there is a prewarm request ongoing, return `false`, `true` otherwise.
/// Has a failpoint "compute-prewarm"
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
let token: CancellationToken;
{
let state = &mut self.state.lock().unwrap();
token = state.lfc_prewarm_token.clone();
if let LfcPrewarmState::Prewarming =
replace(&mut state.lfc_prewarm_state, LfcPrewarmState::Prewarming)
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
if let LfcPrewarmState::Prewarming = replace(state, LfcPrewarmState::Prewarming) {
return false;
}
}
crate::metrics::LFC_PREWARMS.inc();
let this = self.clone();
let cloned = self.clone();
spawn(async move {
let prewarm_state = match this.prewarm_impl(from_endpoint, token).await {
Ok(state) => state,
let state = match cloned.prewarm_impl(from_endpoint).await {
Ok(true) => LfcPrewarmState::Completed,
Ok(false) => {
info!(
"skipping LFC prewarm because LFC state is not found in endpoint storage"
);
LfcPrewarmState::Skipped
}
Err(err) => {
crate::metrics::LFC_PREWARM_ERRORS.inc();
error!(%err, "could not prewarm LFC");
let error = format!("{err:#}");
LfcPrewarmState::Failed { error }
LfcPrewarmState::Failed {
error: format!("{err:#}"),
}
}
};
let state = &mut this.state.lock().unwrap();
if let LfcPrewarmState::Cancelled = prewarm_state {
state.lfc_prewarm_token = CancellationToken::new();
}
state.lfc_prewarm_state = prewarm_state;
cloned.state.lock().unwrap().lfc_prewarm_state = state;
});
true
}
@@ -134,70 +132,47 @@ impl ComputeNode {
/// Request LFC state from endpoint storage and load corresponding pages into Postgres.
/// Returns a result with `false` if the LFC state is not found in endpoint storage.
async fn prewarm_impl(
&self,
from_endpoint: Option<String>,
token: CancellationToken,
) -> Result<LfcPrewarmState> {
let EndpointStoragePair {
url,
token: storage_token,
} = self.endpoint_storage_pair(from_endpoint)?;
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<bool> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
fail::fail_point!("compute-prewarm", |_| {
bail!("prewarm configured to fail because of a failpoint")
});
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(storage_token);
let response = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
response = request.send() => response
}
.context("querying endpoint storage")?;
match response.status() {
let request = Client::new().get(&url).bearer_auth(token);
let res = request.send().await.context("querying endpoint storage")?;
match res.status() {
StatusCode::OK => (),
StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
StatusCode::NOT_FOUND => {
return Ok(false);
}
status => bail!("{status} querying endpoint storage"),
}
let mut uncompressed = Vec::new();
let lfc_state = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
lfc_state = response.bytes() => lfc_state
}
.context("getting request body from endpoint storage")?;
let mut decoder = ZstdDecoder::new(lfc_state.iter().as_slice());
select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
read = decoder.read_to_end(&mut uncompressed) => read
}
.context("decoding LFC state")?;
let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
// Client connection and prewarm info querying are fast and therefore don't need
// cancellation
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
let lfc_state = res
.bytes()
.await
.context("connecting to postgres")?;
let pg_token = client.cancel_token();
.context("getting request body from endpoint storage")?;
ZstdDecoder::new(lfc_state.iter().as_slice())
.read_to_end(&mut uncompressed)
.await
.context("decoding LFC state")?;
let uncompressed_len = uncompressed.len();
let params: Vec<&(dyn postgres_types::ToSql + Sync)> = vec![&uncompressed];
select! {
res = client.query_one("select neon.prewarm_local_cache($1)", &params) => res,
_ = token.cancelled() => {
pg_token.cancel_query(postgres::NoTls).await
.context("cancelling neon.prewarm_local_cache()")?;
return Ok(LfcPrewarmState::Cancelled)
}
}
.context("loading LFC state into postgres")
.map(|_| ())?;
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into Postgres");
Ok(LfcPrewarmState::Completed)
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
.await
.context("loading LFC state into postgres")
.map(|_| ())?;
Ok(true)
}
/// If offload request is ongoing, return false, true otherwise
@@ -225,20 +200,20 @@ impl ComputeNode {
async fn offload_lfc_with_state_update(&self) {
crate::metrics::LFC_OFFLOADS.inc();
let state = match self.offload_lfc_impl().await {
Ok(state) => state,
Err(err) => {
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "could not offload LFC");
let error = format!("{err:#}");
LfcOffloadState::Failed { error }
}
let Err(err) = self.offload_lfc_impl().await else {
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
return;
};
self.state.lock().unwrap().lfc_offload_state = state;
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "could not offload LFC state to endpoint storage");
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: format!("{err:#}"),
};
}
async fn offload_lfc_impl(&self) -> Result<LfcOffloadState> {
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from Postgres");
@@ -253,7 +228,7 @@ impl ComputeNode {
.context("deserializing LFC state")?;
let Some(state) = state else {
info!(%url, "empty LFC state, not exporting");
return Ok(LfcOffloadState::Skipped);
return Ok(());
};
let mut compressed = Vec::new();
@@ -267,7 +242,7 @@ impl ComputeNode {
let request = Client::new().put(url).bearer_auth(token).body(compressed);
match request.send().await {
Ok(res) if res.status() == StatusCode::OK => Ok(LfcOffloadState::Completed),
Ok(res) if res.status() == StatusCode::OK => Ok(()),
Ok(res) => bail!(
"Request to endpoint storage failed with status: {}",
res.status()
@@ -275,8 +250,4 @@ impl ComputeNode {
Err(err) => Err(err).context("writing to endpoint storage"),
}
}
pub fn cancel_prewarm(self: &Arc<Self>) {
self.state.lock().unwrap().lfc_prewarm_token.cancel();
}
}

View File

@@ -78,7 +78,7 @@ impl ComputeNode {
const RETRIES: i32 = 20;
for i in 0..=RETRIES {
let row = client
.query_one("SELECT pg_catalog.pg_last_wal_replay_lsn()", &[])
.query_one("SELECT pg_last_wal_replay_lsn()", &[])
.await
.context("getting last replay lsn")?;
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
@@ -103,7 +103,7 @@ impl ComputeNode {
.await
.context("setting safekeepers")?;
client
.query("SELECT pg_catalog.pg_reload_conf()", &[])
.query("SELECT pg_reload_conf()", &[])
.await
.context("reloading postgres config")?;
@@ -113,7 +113,7 @@ impl ComputeNode {
});
let row = client
.query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
.query_one("SELECT * FROM pg_promote()", &[])
.await
.context("pg_promote")?;
if !row.get::<usize, bool>(0) {

View File

@@ -16,7 +16,7 @@ use crate::pg_helpers::{
DatabricksSettingsExt as _, GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize,
escape_conf_value,
};
use crate::tls::{self, SERVER_CRT, SERVER_KEY};
use crate::tls::{SERVER_CRT, SERVER_KEY};
use utils::shard::{ShardIndex, ShardNumber};
@@ -178,14 +178,9 @@ pub fn write_postgres_conf(
}
// tls
if let Some(tls_config) = tls_config {
if tls_config.is_some() {
writeln!(file, "ssl = on")?;
// postgres requires the keyfile to be in a secure file,
// currently too complicated to ensure that at the VM level,
// so we just copy them to another file instead. :shrug:
tls::update_key_path_blocking(pgdata_path, tls_config);
// these are the default, but good to be explicit.
writeln!(file, "ssl_cert_file = '{SERVER_CRT}'")?;
writeln!(file, "ssl_key_file = '{SERVER_KEY}'")?;

View File

@@ -139,15 +139,6 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/LfcPrewarmState"
delete:
tags:
- Prewarm
summary: Cancel ongoing LFC prewarm
description: ""
operationId: cancelLfcPrewarm
responses:
202:
description: Prewarm cancelled
/lfc/offload:
post:
@@ -645,7 +636,7 @@ components:
properties:
status:
description: LFC offload status
enum: [not_offloaded, offloading, completed, skipped, failed]
enum: [not_offloaded, offloading, completed, failed]
type: string
error:
description: LFC offload error, if any

View File

@@ -12,8 +12,10 @@ use crate::http::JsonResponse;
/// Check that the compute is currently running.
pub(in crate::http) async fn is_writable(State(compute): State<Arc<ComputeNode>>) -> Response {
let status = compute.get_status();
if status != ComputeStatus::Running {
return JsonResponse::invalid_status(status);
match status {
// If we are running, or just reloading the config, we are ok to write a new config.
ComputeStatus::Running | ComputeStatus::Reloading => {}
_ => return JsonResponse::invalid_status(status),
}
match check_writability(&compute).await {

View File

@@ -27,32 +27,6 @@ pub(in crate::http) async fn configure(
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
};
// XXX: wrap state update under lock in a code block. Otherwise, we will try
// to `Send` `mut state` into the spawned thread bellow, which will cause
// the following rustc error:
//
// error: future cannot be sent between threads safely
{
let mut state = compute.state.lock().unwrap();
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
return JsonResponse::invalid_status(state.status);
}
// Pass the tracing span to the main thread that performs the startup,
// so that the start_compute operation is considered a child of this
// configure request for tracing purposes.
state.startup_span = Some(tracing::Span::current());
if compute.params.lakebase_mode {
ComputeNode::set_spec(&compute.params, &mut state, pspec);
} else {
state.pspec = Some(pspec);
}
state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
drop(state);
}
// Spawn a blocking thread to wait for compute to become Running. This is
// needed to not block the main pool of workers and to be able to serve
// other requests while some particular request is waiting for compute to
@@ -60,6 +34,32 @@ pub(in crate::http) async fn configure(
let c = compute.clone();
let completed = task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
loop {
match state.status {
// ideal state.
ComputeStatus::Empty | ComputeStatus::Running => break,
// we need to wait until reloaded
ComputeStatus::Reloading => {
state = c.state_changed.wait(state).unwrap();
}
// All other cases are unexpected.
_ => return Err(JsonResponse::invalid_status(state.status)),
}
}
// Pass the tracing span to the main thread that performs the startup,
// so that the start_compute operation is considered a child of this
// configure request for tracing purposes.
state.startup_span = Some(tracing::Span::current());
if c.params.lakebase_mode {
ComputeNode::set_spec(&c.params, &mut state, pspec);
} else {
state.pspec = Some(pspec);
}
state.set_status(ComputeStatus::ConfigurationPending, &c.state_changed);
while state.status != ComputeStatus::Running {
state = c.state_changed.wait(state).unwrap();
info!(
@@ -71,7 +71,7 @@ pub(in crate::http) async fn configure(
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {err:?}");
return Err(msg);
return Err(JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, msg));
}
}
@@ -81,7 +81,7 @@ pub(in crate::http) async fn configure(
.unwrap();
if let Err(e) = completed {
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
return e;
}
// Return current compute state if everything went well.

View File

@@ -46,8 +46,3 @@ pub(in crate::http) async fn offload(compute: Compute) -> Response {
)
}
}
pub(in crate::http) async fn cancel_prewarm(compute: Compute) -> StatusCode {
compute.cancel_prewarm();
StatusCode::ACCEPTED
}

View File

@@ -99,12 +99,7 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
);
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route(
"/lfc/prewarm",
get(lfc::prewarm_state)
.post(lfc::prewarm)
.delete(lfc::cancel_prewarm),
)
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
.route("/promote", post(promote::promote))
.route("/check_writability", post(check_writability::is_writable))

View File

@@ -19,7 +19,7 @@ async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
.query(
"SELECT datname FROM pg_catalog.pg_database
WHERE datallowconn
AND datconnlimit OPERATOR(pg_catalog.<>) (OPERATOR(pg_catalog.-) 2::pg_catalog.int4)
AND datconnlimit <> - 2
LIMIT 500",
&[],
)
@@ -67,7 +67,7 @@ pub async fn get_installed_extensions(
let extensions: Vec<(String, String, i32)> = client
.query(
"SELECT extname, extversion, extowner::pg_catalog.int4 FROM pg_catalog.pg_extension",
"SELECT extname, extversion, extowner::integer FROM pg_catalog.pg_extension",
&[],
)
.await?

View File

@@ -11,9 +11,11 @@ use utils::pid_file::{self, PidFileRead};
pub fn configure(local_proxy: &LocalProxySpec) -> Result<()> {
write_local_proxy_conf("/etc/local_proxy/config.json".as_ref(), local_proxy)?;
notify_local_proxy("/etc/local_proxy/pid".as_ref())?;
reload()
}
Ok(())
pub fn reload() -> Result<()> {
notify_local_proxy("/etc/local_proxy/pid".as_ref())
}
/// Create or completely rewrite configuration file specified by `path`

View File

@@ -76,7 +76,7 @@ impl<'m> MigrationRunner<'m> {
self.client
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
.await?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key pg_catalog.int4 NOT NULL PRIMARY KEY, id pg_catalog.int8 NOT NULL DEFAULT 0)").await?;
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
self.client
.simple_query(
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",

View File

@@ -15,17 +15,17 @@ DO $$
DECLARE
role_name text;
BEGIN
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles WHERE pg_catalog.pg_has_role(rolname, '{privileged_role_name}', 'member')
FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, '{privileged_role_name}', 'member')
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', pg_catalog.quote_ident(role_name);
EXECUTE pg_catalog.format('ALTER ROLE %I INHERIT;', role_name);
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' INHERIT';
END LOOP;
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles
FOR role_name IN SELECT rolname FROM pg_roles
WHERE
NOT pg_catalog.pg_has_role(rolname, '{privileged_role_name}', 'member') AND NOT pg_catalog.starts_with(rolname, 'pg_')
NOT pg_has_role(rolname, '{privileged_role_name}', 'member') AND NOT starts_with(rolname, 'pg_')
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', pg_catalog.quote_ident(role_name);
EXECUTE pg_catalog.format('ALTER ROLE %I NOBYPASSRLS;', role_name);
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOBYPASSRLS';
END LOOP;
END $$;

View File

@@ -1,6 +1,6 @@
DO $$
BEGIN
IF (SELECT setting::pg_catalog.numeric >= 160000 FROM pg_catalog.pg_settings WHERE name = 'server_version_num') THEN
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
EXECUTE 'GRANT pg_create_subscription TO {privileged_role_name}';
END IF;
END $$;

View File

@@ -5,9 +5,9 @@ DO $$
DECLARE
role_name TEXT;
BEGIN
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles WHERE rolreplication IS TRUE
FOR role_name IN SELECT rolname FROM pg_roles WHERE rolreplication IS TRUE
LOOP
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', pg_catalog.quote_ident(role_name);
EXECUTE pg_catalog.format('ALTER ROLE %I NOREPLICATION;', role_name);
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', quote_ident(role_name);
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOREPLICATION';
END LOOP;
END $$;

View File

@@ -1,6 +1,6 @@
DO $$
BEGIN
IF (SELECT setting::pg_catalog.numeric >= 160000 FROM pg_catalog.pg_settings WHERE name OPERATOR(pg_catalog.=) 'server_version_num'::pg_catalog.text) THEN
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_export_snapshot TO {privileged_role_name}';
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_log_standby_snapshot TO {privileged_role_name}';
END IF;

View File

@@ -2,7 +2,7 @@ DO $$
DECLARE
bypassrls boolean;
BEGIN
SELECT rolbypassrls INTO bypassrls FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser';
SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
IF NOT bypassrls THEN
RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
END IF;

View File

@@ -4,8 +4,8 @@ DECLARE
BEGIN
FOR role IN
SELECT rolname AS name, rolinherit AS inherit
FROM pg_catalog.pg_roles
WHERE pg_catalog.pg_has_role(rolname, 'neon_superuser', 'member')
FROM pg_roles
WHERE pg_has_role(rolname, 'neon_superuser', 'member')
LOOP
IF NOT role.inherit THEN
RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
@@ -14,12 +14,12 @@ BEGIN
FOR role IN
SELECT rolname AS name, rolbypassrls AS bypassrls
FROM pg_catalog.pg_roles
WHERE NOT pg_catalog.pg_has_role(rolname, 'neon_superuser', 'member')
AND NOT pg_catalog.starts_with(rolname, 'pg_')
FROM pg_roles
WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
AND NOT starts_with(rolname, 'pg_')
LOOP
IF role.bypassrls THEN
RAISE EXCEPTION '% can bypass RLS', pg_catalog.quote_ident(role.name);
RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
END IF;
END LOOP;
END $$;

View File

@@ -1,10 +1,10 @@
DO $$
BEGIN
IF (SELECT pg_catalog.current_setting('server_version_num')::pg_catalog.numeric < 160000) THEN
IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
RETURN;
END IF;
IF NOT (SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
END IF;
END $$;

View File

@@ -2,12 +2,12 @@ DO $$
DECLARE
monitor record;
BEGIN
SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
SELECT pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
admin_option AS admin
INTO monitor
FROM pg_catalog.pg_auth_members
WHERE roleid = 'pg_monitor'::pg_catalog.regrole
AND member = 'neon_superuser'::pg_catalog.regrole;
FROM pg_auth_members
WHERE roleid = 'pg_monitor'::regrole
AND member = 'neon_superuser'::regrole;
IF monitor IS NULL THEN
RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_monitor';

View File

@@ -2,11 +2,11 @@ DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT pg_catalog.bool_and(pg_catalog.has_function_privilege('neon_superuser', oid, 'execute'))
SELECT bool_and(has_function_privilege('neon_superuser', oid, 'execute'))
INTO can_execute
FROM pg_catalog.pg_proc
FROM pg_proc
WHERE proname IN ('pg_export_snapshot', 'pg_log_standby_snapshot')
AND pronamespace = 'pg_catalog'::pg_catalog.regnamespace;
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN
RAISE EXCEPTION 'neon_superuser cannot execute both pg_export_snapshot and pg_log_standby_snapshot';
END IF;

View File

@@ -2,9 +2,9 @@ DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT pg_catalog.has_function_privilege('neon_superuser', oid, 'execute')
SELECT has_function_privilege('neon_superuser', oid, 'execute')
INTO can_execute
FROM pg_catalog.pg_proc
FROM pg_proc
WHERE proname = 'pg_show_replication_origin_status'
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN

View File

@@ -2,10 +2,10 @@ DO $$
DECLARE
signal_backend record;
BEGIN
SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
SELECT pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
admin_option AS admin
INTO signal_backend
FROM pg_catalog.pg_auth_members
FROM pg_auth_members
WHERE roleid = 'pg_signal_backend'::regrole
AND member = 'neon_superuser'::regrole;

View File

@@ -407,9 +407,9 @@ fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
// like `postgres_exporter` use it to query Postgres statistics.
// Use explicit 8 bytes type casts to match Rust types.
let stats = cli.query_one(
"SELECT pg_catalog.coalesce(pg_catalog.sum(active_time), 0.0)::pg_catalog.float8 AS total_active_time,
pg_catalog.coalesce(pg_catalog.sum(sessions), 0)::pg_catalog.bigint AS total_sessions
FROM pg_catalog.pg_stat_database
"SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
coalesce(sum(sessions), 0)::bigint AS total_sessions
FROM pg_stat_database
WHERE datname NOT IN (
'postgres',
'template0',
@@ -445,11 +445,11 @@ fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime
let mut last_active: Option<DateTime<Utc>> = None;
// Get all running client backends except ourself, use RFC3339 DateTime format.
let backends = cli.query(
"SELECT state, pg_catalog.to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"'::pg_catalog.text) AS state_change
"SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
FROM pg_stat_activity
WHERE backend_type OPERATOR(pg_catalog.=) 'client backend'::pg_catalog.text
AND pid OPERATOR(pg_catalog.!=) pg_catalog.pg_backend_pid()
AND usename OPERATOR(pg_catalog.!=) 'cloud_admin'::pg_catalog.name;", // XXX: find a better way to filter other monitors?
WHERE backend_type = 'client backend'
AND pid != pg_backend_pid()
AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
&[],
);

View File

@@ -299,9 +299,9 @@ pub async fn get_existing_dbs_async(
.query_raw::<str, &String, &[String; 0]>(
"SELECT
datname AS name,
(SELECT rolname FROM pg_catalog.pg_roles WHERE oid OPERATOR(pg_catalog.=) datdba) AS owner,
(SELECT rolname FROM pg_roles WHERE oid = datdba) AS owner,
NOT datallowconn AS restrict_conn,
datconnlimit OPERATOR(pg_catalog.=) (OPERATOR(pg_catalog.-) 2) AS invalid
datconnlimit = - 2 AS invalid
FROM
pg_catalog.pg_database;",
&[],
@@ -466,13 +466,7 @@ fn update_pgbouncer_ini(
Ok(())
}
/// Tune pgbouncer.
/// 1. Apply new config using pgbouncer admin console
/// 2. Add new values to pgbouncer.ini to preserve them after restart
pub async fn tune_pgbouncer(
mut pgbouncer_config: IndexMap<String, String>,
tls_config: Option<TlsConfig>,
) -> Result<()> {
async fn connect() -> Result<tokio_postgres::Client> {
let pgbouncer_connstr = if std::env::var_os("AUTOSCALING").is_some() {
// for VMs use pgbouncer specific way to connect to
// pgbouncer admin console without password
@@ -518,18 +512,17 @@ pub async fn tune_pgbouncer(
}
};
if let Some(tls_config) = tls_config {
// pgbouncer starts in a half-ok state if it cannot find these files.
// It will default to client_tls_sslmode=deny, which causes proxy to error.
// There is a small window at startup where these files don't yet exist in the VM.
// Best to wait until it exists.
loop {
if let Ok(true) = tokio::fs::try_exists(&tls_config.key_path).await {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await
}
Ok(client)
}
/// Tune pgbouncer.
/// 1. Apply new config to pgbouncer.ini
/// 2. Notify pgbouncer to reload
pub async fn tune_pgbouncer(
mut pgbouncer_config: IndexMap<String, String>,
tls_config: Option<TlsConfig>,
) -> Result<()> {
if let Some(tls_config) = tls_config {
pgbouncer_config.insert("client_tls_cert_file".to_string(), tls_config.cert_path);
pgbouncer_config.insert("client_tls_key_file".to_string(), tls_config.key_path);
pgbouncer_config.insert("client_tls_sslmode".to_string(), "allow".to_string());
@@ -550,10 +543,17 @@ pub async fn tune_pgbouncer(
info!("Applying pgbouncer setting change");
reload_pgbouncer().await
}
/// Reload pgbouncer.
pub async fn reload_pgbouncer() -> Result<()> {
let client = connect().await?;
if let Err(err) = client.simple_query("RELOAD").await {
// Don't fail on error, just print it into log
error!("Failed to apply pgbouncer setting change, {err}",);
};
error!("Failed to apply pgbouncer setting change: {err}",);
}
Ok(())
}

View File

@@ -82,7 +82,7 @@ impl ComputeNode {
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
drop_subscriptions_done = match
client.query("select 1 from neon.drop_subscriptions_done where timeline_id OPERATOR(pg_catalog.=) $1", &[&timeline_id.to_string()]).await {
client.query("select 1 from neon.drop_subscriptions_done where timeline_id = $1", &[&timeline_id.to_string()]).await {
Ok(result) => !result.is_empty(),
Err(e) =>
{
@@ -1142,9 +1142,7 @@ async fn get_operations<'a>(
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
if libs.contains("pg_stat_statements") {
return Ok(Box::new(once(Operation {
query: String::from(
"CREATE EXTENSION IF NOT EXISTS pg_stat_statements WITH SCHEMA public",
),
query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"),
comment: Some(String::from("create system extensions")),
})));
}
@@ -1152,13 +1150,11 @@ async fn get_operations<'a>(
Ok(Box::new(empty()))
}
ApplySpecPhase::CreatePgauditExtension => Ok(Box::new(once(Operation {
query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit WITH SCHEMA public"),
query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit"),
comment: Some(String::from("create pgaudit extensions")),
}))),
ApplySpecPhase::CreatePgauditlogtofileExtension => Ok(Box::new(once(Operation {
query: String::from(
"CREATE EXTENSION IF NOT EXISTS pgauditlogtofile WITH SCHEMA public",
),
query: String::from("CREATE EXTENSION IF NOT EXISTS pgauditlogtofile"),
comment: Some(String::from("create pgauditlogtofile extensions")),
}))),
// Disable pgaudit logging for postgres database.
@@ -1182,7 +1178,7 @@ async fn get_operations<'a>(
},
Operation {
query: String::from(
"UPDATE pg_catalog.pg_extension SET extrelocatable = true WHERE extname OPERATOR(pg_catalog.=) 'neon'::pg_catalog.name AND extrelocatable OPERATOR(pg_catalog.=) false",
"UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'",
),
comment: Some(String::from("compat/fix: make neon relocatable")),
},

View File

@@ -3,17 +3,16 @@ BEGIN
IF NOT EXISTS(
SELECT 1
FROM pg_catalog.pg_tables
WHERE tablename::pg_catalog.name OPERATOR(pg_catalog.=) 'health_check'::pg_catalog.name
AND schemaname::pg_catalog.name OPERATOR(pg_catalog.=) 'public'::pg_catalog.name
WHERE tablename = 'health_check'
)
THEN
CREATE TABLE public.health_check (
id pg_catalog.int4 primary key generated by default as identity,
updated_at pg_catalog.timestamptz default pg_catalog.now()
CREATE TABLE health_check (
id serial primary key,
updated_at timestamptz default now()
);
INSERT INTO public.health_check VALUES (1, pg_catalog.now())
INSERT INTO health_check VALUES (1, now())
ON CONFLICT (id) DO UPDATE
SET updated_at = pg_catalog.now();
SET updated_at = now();
END IF;
END
$$

View File

@@ -2,11 +2,10 @@ DO $$
DECLARE
query varchar;
BEGIN
FOR query IN
SELECT pg_catalog.format('ALTER FUNCTION %I(%s) OWNER TO {db_owner};', p.oid::regproc, pg_catalog.pg_get_function_identity_arguments(p.oid))
FROM pg_catalog.pg_proc p
WHERE p.pronamespace OPERATOR(pg_catalog.=) 'anon'::regnamespace::oid
LOOP
FOR query IN SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {db_owner};'
FROM pg_proc p
JOIN pg_namespace nsp ON p.pronamespace = nsp.oid
WHERE nsp.nspname = 'anon' LOOP
EXECUTE query;
END LOOP;
END

View File

@@ -1,6 +1,6 @@
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname OPERATOR(pg_catalog.=) '{privileged_role_name}'::pg_catalog.name)
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{privileged_role_name}')
THEN
CREATE ROLE {privileged_role_name} {privileges} IN ROLE pg_read_all_data, pg_write_all_data;
END IF;

View File

@@ -4,14 +4,14 @@ $$
IF EXISTS(
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname OPERATOR(pg_catalog.=) 'public'
WHERE nspname = 'public'
) AND
pg_catalog.current_setting('server_version_num')::int OPERATOR(pg_catalog./) 10000 OPERATOR(pg_catalog.>=) 15
current_setting('server_version_num')::int / 10000 >= 15
THEN
IF EXISTS(
SELECT rolname
FROM pg_catalog.pg_roles
WHERE rolname OPERATOR(pg_catalog.=) 'web_access'
WHERE rolname = 'web_access'
)
THEN
GRANT CREATE ON SCHEMA public TO web_access;
@@ -20,7 +20,7 @@ $$
IF EXISTS(
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname OPERATOR(pg_catalog.=) 'public'
WHERE nspname = 'public'
)
THEN
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;

View File

@@ -2,17 +2,11 @@ DO ${outer_tag}$
DECLARE
subname TEXT;
BEGIN
LOCK TABLE pg_catalog.pg_subscription IN ACCESS EXCLUSIVE MODE;
FOR subname IN
SELECT pg_subscription.subname
FROM pg_catalog.pg_subscription
WHERE subdbid OPERATOR(pg_catalog.=) (
SELECT oid FROM pg_database WHERE datname OPERATOR(pg_catalog.=) {datname_str}::pg_catalog.name
)
LOOP
EXECUTE pg_catalog.format('ALTER SUBSCRIPTION %I DISABLE;', subname);
EXECUTE pg_catalog.format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
EXECUTE pg_catalog.format('DROP SUBSCRIPTION %I;', subname);
LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE;
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
EXECUTE format('DROP SUBSCRIPTION %I;', subname);
END LOOP;
END;
${outer_tag}$;

View File

@@ -3,19 +3,19 @@ BEGIN
IF NOT EXISTS(
SELECT 1
FROM pg_catalog.pg_tables
WHERE tablename OPERATOR(pg_catalog.=) 'drop_subscriptions_done'::pg_catalog.name
AND schemaname OPERATOR(pg_catalog.=) 'neon'::pg_catalog.name
WHERE tablename = 'drop_subscriptions_done'
AND schemaname = 'neon'
)
THEN
CREATE TABLE neon.drop_subscriptions_done
(id pg_catalog.int4 primary key generated by default as identity, timeline_id pg_catalog.text);
(id serial primary key, timeline_id text);
END IF;
-- preserve the timeline_id of the last drop_subscriptions run
-- to ensure that the cleanup of a timeline is executed only once.
-- use upsert to avoid the table bloat in case of cascade branching (branch of a branch)
INSERT INTO neon.drop_subscriptions_done VALUES (1, pg_catalog.current_setting('neon.timeline_id'))
INSERT INTO neon.drop_subscriptions_done VALUES (1, current_setting('neon.timeline_id'))
ON CONFLICT (id) DO UPDATE
SET timeline_id = pg_catalog.current_setting('neon.timeline_id')::pg_catalog.text;
SET timeline_id = current_setting('neon.timeline_id');
END
$$

View File

@@ -15,15 +15,15 @@ BEGIN
WHERE schema_name IN ('public')
LOOP
FOR grantor IN EXECUTE
pg_catalog.format(
'SELECT DISTINCT rtg.grantor FROM information_schema.role_table_grants AS rtg WHERE grantee OPERATOR(pg_catalog.=) %s',
format(
'SELECT DISTINCT rtg.grantor FROM information_schema.role_table_grants AS rtg WHERE grantee = %s',
-- N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
quote_literal({role_name})
)
LOOP
EXECUTE pg_catalog.format('SET LOCAL ROLE %I', grantor);
EXECUTE format('SET LOCAL ROLE %I', grantor);
revoke_query := pg_catalog.format(
revoke_query := format(
'REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA %I FROM %I GRANTED BY %I',
schema,
-- N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`

View File

@@ -5,17 +5,17 @@ DO ${outer_tag}$
IF EXISTS(
SELECT nspname
FROM pg_catalog.pg_namespace
WHERE nspname OPERATOR(pg_catalog.=) 'public'::pg_catalog.name
WHERE nspname = 'public'
)
THEN
SELECT nspowner::regrole::text
FROM pg_catalog.pg_namespace
WHERE nspname OPERATOR(pg_catalog.=) 'public'::pg_catalog.text
WHERE nspname = 'public'
INTO schema_owner;
IF schema_owner OPERATOR(pg_catalog.=) 'cloud_admin'::pg_catalog.text OR schema_owner OPERATOR(pg_catalog.=) 'zenith_admin'::pg_catalog.text
IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'
THEN
EXECUTE pg_catalog.format('ALTER SCHEMA public OWNER TO %I', {db_owner});
EXECUTE format('ALTER SCHEMA public OWNER TO %I', {db_owner});
END IF;
END IF;
END

View File

@@ -3,10 +3,10 @@ DO ${outer_tag}$
IF EXISTS(
SELECT 1
FROM pg_catalog.pg_database
WHERE datname OPERATOR(pg_catalog.=) {datname}::pg_catalog.name
WHERE datname = {datname}
)
THEN
EXECUTE pg_catalog.format('ALTER DATABASE %I is_template false', {datname});
EXECUTE format('ALTER DATABASE %I is_template false', {datname});
END IF;
END
${outer_tag}$;

View File

@@ -3,42 +3,43 @@ use std::{io::Write, os::unix::fs::OpenOptionsExt, path::Path, time::Duration};
use anyhow::{Context, Result, bail};
use compute_api::responses::TlsConfig;
use ring::digest;
use x509_cert::Certificate;
#[derive(Clone, Copy)]
pub struct CertDigest(digest::Digest);
pub async fn watch_cert_for_changes(cert_path: String) -> tokio::sync::watch::Receiver<CertDigest> {
let mut digest = compute_digest(&cert_path).await;
let (tx, rx) = tokio::sync::watch::channel(digest);
tokio::spawn(async move {
while !tx.is_closed() {
let new_digest = compute_digest(&cert_path).await;
if digest.0.as_ref() != new_digest.0.as_ref() {
digest = new_digest;
_ = tx.send(digest);
}
tokio::time::sleep(Duration::from_secs(60)).await
}
});
rx
impl PartialEq for CertDigest {
fn eq(&self, other: &Self) -> bool {
self.0.as_ref() == other.0.as_ref()
}
}
async fn compute_digest(cert_path: &str) -> CertDigest {
pub fn wait_until_cert_changed(digest: CertDigest, cert_path: &str) -> CertDigest {
loop {
match try_compute_digest(cert_path).await {
let new_digest = compute_digest(cert_path);
if digest != new_digest {
break new_digest;
}
// Wait a while before checking the certificates.
// We renew on a daily basis, so there's no rush.
std::thread::sleep(Duration::from_secs(60));
}
}
pub fn compute_digest(cert_path: &str) -> CertDigest {
loop {
match try_compute_digest(cert_path) {
Ok(d) => break d,
Err(e) => {
tracing::error!("could not read cert file {e:?}");
tokio::time::sleep(Duration::from_secs(1)).await
std::thread::sleep(Duration::from_secs(1))
}
}
}
}
async fn try_compute_digest(cert_path: &str) -> Result<CertDigest> {
let data = tokio::fs::read(cert_path).await?;
fn try_compute_digest(cert_path: &str) -> Result<CertDigest> {
let data = std::fs::read(cert_path)?;
// sha256 is extremely collision resistent. can safely assume the digest to be unique
Ok(CertDigest(digest::digest(&digest::SHA256, &data)))
}
@@ -46,28 +47,37 @@ async fn try_compute_digest(cert_path: &str) -> Result<CertDigest> {
pub const SERVER_CRT: &str = "server.crt";
pub const SERVER_KEY: &str = "server.key";
pub fn update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) {
pub struct KeyPair {
crt: String,
key: String,
}
pub fn load_certs_blocking(tls_config: &TlsConfig) -> KeyPair {
loop {
match try_update_key_path_blocking(pg_data, tls_config) {
Ok(()) => break,
match try_load_certs_blocking(tls_config) {
Ok(key_pair) => break key_pair,
Err(e) => {
tracing::error!(error = ?e, "could not create key file");
tracing::error!(error = ?e, "could not load certs");
std::thread::sleep(Duration::from_secs(1))
}
}
}
}
// Postgres requires the keypath be "secure". This means
// 1. Owned by the postgres user.
// 2. Have permission 600.
fn try_update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) -> Result<()> {
fn try_load_certs_blocking(tls_config: &TlsConfig) -> Result<KeyPair> {
let key = std::fs::read_to_string(&tls_config.key_path)?;
let crt = std::fs::read_to_string(&tls_config.cert_path)?;
// to mitigate a race condition during renewal.
verify_key_cert(&key, &crt)?;
Ok(KeyPair { key, crt })
}
// Postgres requires the keypath be "secure". This means
// 1. Owned by the postgres user.
// 2. Have permission 600.
pub fn update_key_path_blocking(pg_data: &Path, key_pair: &KeyPair) -> Result<()> {
let mut key_file = std::fs::OpenOptions::new()
.write(true)
.create(true)
@@ -82,14 +92,22 @@ fn try_update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) -> Resul
.mode(0o600)
.open(pg_data.join(SERVER_CRT))?;
key_file.write_all(key.as_bytes())?;
crt_file.write_all(crt.as_bytes())?;
// NOTE: We currently ensure that an explicit reload does not happen during TLS renewal, but
// there's a chance that postgres/pgbouncer/local_proxy reloads implicitly halfway between
// these writes. This could allow them to reads the wrong keys to the wrong certs.
// There doesn't seem to be any way to prevent that. However, we will issue a reload shortly
// after which should at least correct it.
key_file.write_all(key_pair.key.as_bytes())?;
crt_file.write_all(key_pair.crt.as_bytes())?;
Ok(())
}
fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
use x509_cert::Certificate;
use x509_cert::der::oid::db::rfc5912::ECDSA_WITH_SHA_256;
use x509_cert::der::oid::db::rfc8410::ID_ED_25519;
use x509_cert::der::pem;
let certs = Certificate::load_pem_chain(cert.as_bytes())
.context("decoding PEM encoded certificates")?;
@@ -100,22 +118,30 @@ fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
bail!("no certificates found");
};
let pubkey = cert
.tbs_certificate
.subject_public_key_info
.subject_public_key
.raw_bytes();
match cert.signature_algorithm.oid {
ECDSA_WITH_SHA_256 => {
let key = p256::SecretKey::from_sec1_pem(key).context("parse key")?;
let a = key.public_key().to_sec1_bytes();
let b = cert
.tbs_certificate
.subject_public_key_info
.subject_public_key
.raw_bytes();
if *a != *b {
if *key.public_key().to_sec1_bytes() != *pubkey {
bail!("private key file does not match certificate")
}
}
_ => bail!("unknown TLS key type"),
ID_ED_25519 => {
use ring::signature::{Ed25519KeyPair, KeyPair};
let (_, bytes) = pem::decode_vec(key.as_bytes())
.map_err(|_| anyhow::anyhow!("invalid key encoding"))?;
let key = Ed25519KeyPair::from_pkcs8_maybe_unchecked(&bytes).context("parse key")?;
if *key.public_key().as_ref() != *pubkey {
bail!("private key file does not match certificate")
}
}
oid => bail!("unknown TLS key type: {oid}"),
}
Ok(())

View File

@@ -1089,7 +1089,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
storage_controller: None,
control_plane_hooks_api: None,
generate_local_ssl_certs: false,
generate_local_tls_certs: false,
generate_compute_tls_certs: false,
}
};

View File

@@ -23,7 +23,7 @@ impl StorageBroker {
}
pub fn initialize(&self) -> anyhow::Result<()> {
if self.env.generate_local_ssl_certs {
if self.env.generate_local_tls_certs {
self.env.generate_ssl_cert(
&self.env.storage_broker_data_dir().join("server.crt"),
&self.env.storage_broker_data_dir().join("server.key"),

View File

@@ -54,7 +54,6 @@ use compute_api::requests::{
};
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TerminateResponse,
TlsConfig,
};
use compute_api::spec::{
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PageserverProtocol,
@@ -213,8 +212,13 @@ impl ComputeControlPlane {
let internal_http_port = internal_http_port.unwrap_or_else(|| external_http_port + 1);
let compute_ctl_config = ComputeCtlConfig {
jwks: Self::create_jwks_from_pem(&self.env.read_public_key()?)?,
tls: None::<TlsConfig>,
tls: self.env.get_tls_config()?,
};
let mut features = vec![];
if compute_ctl_config.tls.is_some() {
features.push(ComputeFeature::TlsExperimental);
}
let ep = Arc::new(Endpoint {
endpoint_id: endpoint_id.to_owned(),
pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), pg_port),
@@ -241,7 +245,7 @@ impl ComputeControlPlane {
drop_subscriptions_before_start,
grpc,
reconfigure_concurrency: 1,
features: vec![],
features: features.clone(),
cluster: None,
compute_ctl_config: compute_ctl_config.clone(),
privileged_role_name: privileged_role_name.clone(),
@@ -263,7 +267,7 @@ impl ComputeControlPlane {
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
features: vec![],
features,
cluster: None,
compute_ctl_config,
privileged_role_name,
@@ -953,7 +957,7 @@ impl Endpoint {
}
// keep retrying
}
ComputeStatus::Running => {
ComputeStatus::Reloading | ComputeStatus::Running => {
// All good!
break;
}

View File

@@ -12,6 +12,7 @@ use std::{env, fs};
use anyhow::{Context, bail};
use clap::ValueEnum;
use compute_api::responses::TlsConfig;
use pageserver_api::config::PostHogConfig;
use pem::Pem;
use postgres_backend::AuthType;
@@ -95,7 +96,10 @@ pub struct LocalEnv {
/// Flag to generate SSL certificates for components that need it.
/// Also generates root CA certificate that is used to sign all other certificates.
pub generate_local_ssl_certs: bool,
pub generate_local_tls_certs: bool,
/// Flag to generate SSL certificates for compute.
pub generate_compute_tls_certs: bool,
}
/// On-disk state stored in `.neon/config`.
@@ -123,7 +127,11 @@ pub struct OnDiskConfig {
// Note: skip serializing because in compat tests old storage controller fails
// to load new config file. May be removed after this field is in release branch.
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub generate_local_ssl_certs: bool,
pub generate_local_tls_certs: bool,
// Note: skip serializing because in compat tests old storage controller fails
// to load new config file. May be removed after this field is in release branch.
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub generate_compute_tls_certs: bool,
}
fn fail_if_pageservers_field_specified<'de, D>(_: D) -> Result<Vec<PageServerConf>, D::Error>
@@ -152,7 +160,8 @@ pub struct NeonLocalInitConf {
pub endpoint_storage: EndpointStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub generate_local_ssl_certs: bool,
pub generate_local_tls_certs: bool,
pub generate_compute_tls_certs: bool,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
@@ -511,7 +520,7 @@ impl LocalEnv {
}
pub fn ssl_ca_cert_path(&self) -> Option<PathBuf> {
if self.generate_local_ssl_certs {
if self.generate_local_tls_certs {
Some(self.base_data_dir.join("rootCA.crt"))
} else {
None
@@ -519,7 +528,7 @@ impl LocalEnv {
}
pub fn ssl_ca_key_path(&self) -> Option<PathBuf> {
if self.generate_local_ssl_certs {
if self.generate_local_tls_certs {
Some(self.base_data_dir.join("rootCA.key"))
} else {
None
@@ -545,6 +554,33 @@ impl LocalEnv {
)
}
fn compute_ssl_paths(&self) -> Option<(PathBuf, PathBuf)> {
if self.generate_compute_tls_certs {
Some((
self.base_data_dir.join("compute_server.crt"),
self.base_data_dir.join("compute_server.key"),
))
} else {
None
}
}
pub fn generate_compute_ssl_cert(&self) -> anyhow::Result<()> {
self.generate_ssl_ca_cert()?;
let (cert_path, key_path) = self.compute_ssl_paths().unwrap();
if !fs::exists(&cert_path)? {
generate_ssl_cert(
&cert_path,
&key_path,
self.ssl_ca_cert_path().unwrap().as_path(),
self.ssl_ca_key_path().unwrap().as_path(),
)?;
}
Ok(())
}
/// Creates HTTP client with local SSL CA certificates.
pub fn create_http_client(&self) -> reqwest::Client {
let ssl_ca_certs = self.ssl_ca_cert_path().map(|ssl_ca_file| {
@@ -673,7 +709,8 @@ impl LocalEnv {
control_plane_hooks_api,
control_plane_compute_hook_api: _,
branch_name_mappings,
generate_local_ssl_certs,
generate_local_tls_certs,
generate_compute_tls_certs,
endpoint_storage,
} = on_disk_config;
LocalEnv {
@@ -690,7 +727,8 @@ impl LocalEnv {
control_plane_api: control_plane_api.unwrap(),
control_plane_hooks_api,
branch_name_mappings,
generate_local_ssl_certs,
generate_local_tls_certs,
generate_compute_tls_certs,
endpoint_storage,
}
};
@@ -806,7 +844,8 @@ impl LocalEnv {
control_plane_hooks_api: self.control_plane_hooks_api.clone(),
control_plane_compute_hook_api: None,
branch_name_mappings: self.branch_name_mappings.clone(),
generate_local_ssl_certs: self.generate_local_ssl_certs,
generate_local_tls_certs: self.generate_local_tls_certs,
generate_compute_tls_certs: self.generate_compute_tls_certs,
endpoint_storage: self.endpoint_storage.clone(),
},
)
@@ -861,6 +900,21 @@ impl LocalEnv {
Ok(pem)
}
/// Get the TLS config if set.
pub fn get_tls_config(&self) -> anyhow::Result<Option<TlsConfig>> {
match self.compute_ssl_paths() {
Some((cert_path, key_path)) => {
self.generate_compute_ssl_cert()?;
Ok(Some(TlsConfig {
key_path: key_path.to_str().context("utf8")?.to_string(),
cert_path: cert_path.to_str().context("utf8")?.to_string(),
}))
}
None => Ok(None),
}
}
/// Materialize the [`NeonLocalInitConf`] to disk. Called during [`neon_local init`].
pub fn init(conf: NeonLocalInitConf, force: &InitForceMode) -> anyhow::Result<()> {
let base_path = base_path();
@@ -912,7 +966,8 @@ impl LocalEnv {
pageservers,
safekeepers,
control_plane_api,
generate_local_ssl_certs,
generate_local_tls_certs,
generate_compute_tls_certs,
control_plane_hooks_api,
endpoint_storage,
} = conf;
@@ -965,13 +1020,17 @@ impl LocalEnv {
control_plane_api: control_plane_api.unwrap(),
control_plane_hooks_api,
branch_name_mappings: Default::default(),
generate_local_ssl_certs,
generate_local_tls_certs,
generate_compute_tls_certs,
endpoint_storage,
};
if generate_local_ssl_certs {
if generate_local_tls_certs {
env.generate_ssl_ca_cert()?;
}
if generate_compute_tls_certs {
env.generate_compute_ssl_cert()?;
}
// create endpoints dir
fs::create_dir_all(env.endpoints_path())?;

View File

@@ -241,7 +241,7 @@ impl PageServerNode {
.context("write identity toml")?;
drop(identity_toml);
if self.env.generate_local_ssl_certs {
if self.env.generate_local_tls_certs {
self.env.generate_ssl_cert(
datadir.join("server.crt").as_path(),
datadir.join("server.key").as_path(),

View File

@@ -102,7 +102,7 @@ impl SafekeeperNode {
/// Initializes a safekeeper node by creating all necessary files,
/// e.g. SSL certificates and JWT token file.
pub fn initialize(&self) -> anyhow::Result<()> {
if self.env.generate_local_ssl_certs {
if self.env.generate_local_tls_certs {
self.env.generate_ssl_cert(
&self.datadir_path().join("server.crt"),
&self.datadir_path().join("server.key"),

View File

@@ -353,7 +353,7 @@ impl StorageController {
}
}
if self.env.generate_local_ssl_certs {
if self.env.generate_local_tls_certs {
self.env.generate_ssl_cert(
&instance_dir.join("server.crt"),
&instance_dir.join("server.key"),

View File

@@ -1,246 +0,0 @@
# Node deletion API improvement
Created on 2025-07-07
Implemented on _TBD_
## Summary
This RFC describes improvements to the storage controller API for gracefully deleting pageserver
nodes.
## Motivation
The basic node deletion API introduced in [#8226](https://github.com/neondatabase/neon/issues/8333)
has several limitations:
- Deleted nodes can re-add themselves if they restart (e.g., a flaky node that keeps restarting and
we cannot reach via SSH to stop the pageserver). This issue has been resolved by tombstone
mechanism in [#12036](https://github.com/neondatabase/neon/issues/12036)
- Process of node deletion is not graceful, i.e. it just imitates a node failure
In this context, "graceful" node deletion means that users do not experience any disruption or
negative effects, provided the system remains in a healthy state (i.e., the remaining pageservers
can handle the workload and all requirements are met). To achieve this, the system must perform
live migration of all tenant shards from the node being deleted while the node is still running
and continue processing all incoming requests. The node is removed only after all tenant shards
have been safely migrated.
Although live migrations can be achieved with the drain functionality, it leads to incorrect shard
placement, such as not matching availability zones. This results in unnecessary work to optimize
the placement that was just recently performed.
If we delete a node before its tenant shards are fully moved, the new node won't have all the
needed data (e.g. heatmaps) ready. This means user requests to the new node will be much slower at
first. If there are many tenant shards, this slowdown affects a huge amount of users.
Graceful node deletion is more complicated and can introduce new issues. It takes longer because
live migration of each tenant shard can last several minutes. Using non-blocking accessors may
also cause deletion to wait if other processes are holding inner state lock. It also gets trickier
because we need to handle other requests, like drain and fill, at the same time.
## Impacted components (e.g. pageserver, safekeeper, console, etc)
- storage controller
- pageserver (indirectly)
## Proposed implementation
### Tombstones
To resolve the problem of deleted nodes re-adding themselves, a tombstone mechanism was introduced
as part of the node stored information. Each node has a separate `NodeLifecycle` field with two
possible states: `Active` and `Deleted`. When node deletion completes, the database row is not
deleted but instead has its `NodeLifecycle` column switched to `Deleted`. Nodes with `Deleted`
lifecycle are treated as if the row is absent for most handlers, with several exceptions: reattach
and register functionality must be aware of tombstones. Additionally, new debug handlers are
available for listing and deleting tombstones via the `/debug/v1/tombstone` path.
### Gracefulness
The problem of making node deletion graceful is complex and involves several challenges:
- **Cancellable**: The operation must be cancellable to allow administrators to abort the process
if needed, e.g. if run by mistake.
- **Non-blocking**: We don't want to block deployment operations like draining/filling on the node
deletion process. We need clear policies for handling concurrent operations: what happens when a
drain/fill request arrives while deletion is in progress, and what happens when a delete request
arrives while drain/fill is in progress.
- **Persistent**: If the storage controller restarts during this long-running operation, we must
preserve progress and automatically resume the deletion process after the storage controller
restarts.
- **Migrated correctly**: We cannot simply use the existing drain mechanism for nodes scheduled
for deletion, as this would move shards to irrelevant locations. The drain process expects the
node to return, so it only moves shards to backup locations, not to their preferred AZs. It also
leaves secondary locations unmoved. This could result in unnecessary load on the storage
controller and inefficient resource utilization.
- **Force option**: Administrators need the ability to force immediate, non-graceful deletion when
time constraints or emergency situations require it, bypassing the normal graceful migration
process.
See below for a detailed breakdown of the proposed changes and mechanisms.
#### Node lifecycle
New `NodeLifecycle` enum and a matching database field with these values:
- `Active`: The normal state. All operations are allowed.
- `ScheduledForDeletion`: The node is marked to be deleted soon. Deletion may be in progress or
will happen later, but the node will eventually be removed. All operations are allowed.
- `Deleted`: The node is fully deleted. No operations are allowed, and the node cannot be brought
back. The only action left is to remove its record from the database. Any attempt to register a
node in this state will fail.
This state persists across storage controller restarts.
**State transition**
```
+--------------------+
+---| Active |<---------------------+
| +--------------------+ |
| ^ |
| start_node_delete | cancel_node_delete |
v | |
+----------------------------------+ |
| ScheduledForDeletion | |
+----------------------------------+ |
| |
| node_register |
| |
| delete_node (at the finish) |
| |
v |
+---------+ tombstone_delete +----------+
| Deleted |-------------------------------->| no row |
+---------+ +----------+
```
#### NodeSchedulingPolicy::Deleting
A `Deleting` variant to the `NodeSchedulingPolicy` enum. This means the deletion function is
running for the node right now. Only one node can have the `Deleting` policy at a time.
The `NodeSchedulingPolicy::Deleting` state is persisted in the database. However, after a storage
controller restart, any node previously marked as `Deleting` will have its scheduling policy reset
to `Pause`. The policy will only transition back to `Deleting` when the deletion operation is
actively started again, as triggered by the node's `NodeLifecycle::ScheduledForDeletion` state.
`NodeSchedulingPolicy` transition details:
1. When `node_delete` begins, set the policy to `NodeSchedulingPolicy::Deleting`.
2. If `node_delete` is cancelled (for example, due to a concurrent drain operation), revert the
policy to its previous value. The policy is persisted in storcon DB.
3. After `node_delete` completes, the final value of the scheduling policy is irrelevant, since
`NodeLifecycle::Deleted` prevents any further access to this field.
The deletion process cannot be initiated for nodes currently undergoing deployment-related
operations (`Draining`, `Filling`, or `PauseForRestart` policies). Deletion will only be triggered
once the node transitions to either the `Active` or `Pause` state.
#### OperationTracker
A replacement for `Option<OperationHandler> ongoing_operation`, the `OperationTracker` is a
dedicated service state object responsible for managing all long-running node operations (drain,
fill, delete) with robust concurrency control.
Key responsibilities:
- Orchestrates the execution of operations
- Supports cancellation of currently running operations
- Enforces operation constraints, e.g. allowing only single drain/fill operation at a time
- Persists deletion state, enabling recovery of pending deletions across restarts
- Ensures thread safety across concurrent requests
#### Attached tenant shard processing
When deleting a node, handle each attached tenant shard as follows:
1. Pick the best node to become the new attached (the candidate).
2. If the candidate already has this shard as a secondary:
- Create a new secondary for the shard on another suitable node.
Otherwise:
- Create a secondary for the shard on the candidate node.
3. Wait until all secondaries are ready and pre-warmed.
4. Promote the candidate's secondary to attached.
5. Remove the secondary from the node being deleted.
This process safely moves all attached shards before deleting the node.
#### Secondary tenant shard processing
When deleting a node, handle each secondary tenant shard as follows:
1. Choose the best node to become the new secondary.
2. Create a secondary for the shard on that node.
3. Wait until the new secondary is ready.
4. Remove the secondary from the node being deleted.
This ensures all secondary shards are safely moved before deleting the node.
### Reliability, failure modes and corner cases
In case of a storage controller failure and following restart, the system behavior depends on the
`NodeLifecycle` state:
- If `NodeLifecycle` is `Active`: No action is taken for this node.
- If `NodeLifecycle` is `Deleted`: The node will not be re-added.
- If `NodeLifecycle` is `ScheduledForDeletion`: A deletion background task will be launched for
this node.
In case of a pageserver node failure during deletion, the behavior depends on the `force` flag:
- If `force` is set: The node deletion will proceed regardless of the node's availability.
- If `force` is not set: The deletion will be retried a limited number of times. If the node
remains unavailable, the deletion process will pause and automatically resume when the node
becomes healthy again.
### Operations concurrency
The following sections describe the behavior when different types of requests arrive at the storage
controller and how they interact with ongoing operations.
#### Delete request
Handler: `PUT /control/v1/node/:node_id/delete`
1. If node lifecycle is `NodeLifecycle::ScheduledForDeletion`:
- Return `200 OK`: there is already an ongoing deletion request for this node
2. Update & persist lifecycle to `NodeLifecycle::ScheduledForDeletion`
3. Persist current scheduling policy
4. If there is no active operation (drain/fill/delete):
- Run deletion process for this node
#### Cancel delete request
Handler: `DELETE /control/v1/node/:node_id/delete`
1. If node lifecycle is not `NodeLifecycle::ScheduledForDeletion`:
- Return `404 Not Found`: there is no current deletion request for this node
2. If the active operation is deleting this node, cancel it
3. Update & persist lifecycle to `NodeLifecycle::Active`
4. Restore the last scheduling policy from persistence
#### Drain/fill request
1. If there are already ongoing drain/fill processes:
- Return `409 Conflict`: queueing of drain/fill processes is not supported
2. If there is an ongoing delete process:
- Cancel it and wait until it is cancelled
3. Run the drain/fill process
4. After the drain/fill process is cancelled or finished:
- Try to find another candidate to delete and run the deletion process for that node
#### Drain/fill cancel request
1. If the active operation is not the related process:
- Return `400 Bad Request`: cancellation request is incorrect, operations are not the same
2. Cancel the active operation
3. Try to find another candidate to delete and run the deletion process for that node
## Definition of Done
- [x] Fix flaky node scenario and introduce related debug handlers
- [ ] Node deletion intent is persistent - a node will be eventually deleted after a deletion
request regardless of draining/filling requests and restarts
- [ ] Node deletion can be graceful - deletion completes only after moving all tenant shards to
recommended locations
- [ ] Deploying does not break due to long deletions - drain/fill operations override deletion
process and deletion resumes after drain/fill completes
- [ ] `force` flag is implemented and provides fast, failure-tolerant node removal (e.g., when a
pageserver node does not respond)
- [ ] Legacy delete handler code is removed from storage_controller, test_runner, and storcon_cli

View File

@@ -27,7 +27,6 @@ pub struct ComputeConfig {
pub spec: Option<ComputeSpec>,
/// The compute_ctl configuration
#[allow(dead_code)]
pub compute_ctl_config: ComputeCtlConfig,
}
@@ -68,15 +67,11 @@ pub enum LfcPrewarmState {
/// We tried to fetch the corresponding LFC state from the endpoint storage,
/// but received `Not Found 404`. This should normally happen only during the
/// first endpoint start after creation with `autoprewarm: true`.
/// This may also happen if LFC is turned off or not initialized
///
/// During the orchestrated prewarm via API, when a caller explicitly
/// provides the LFC state key to prewarm from, it's the caller responsibility
/// to handle this status as an error state in this case.
Skipped,
/// LFC prewarm was cancelled. Some pages in LFC cache may be prewarmed if query
/// has started working before cancellation
Cancelled,
}
impl Display for LfcPrewarmState {
@@ -87,7 +82,6 @@ impl Display for LfcPrewarmState {
LfcPrewarmState::Completed => f.write_str("Completed"),
LfcPrewarmState::Skipped => f.write_str("Skipped"),
LfcPrewarmState::Failed { error } => write!(f, "Error({error})"),
LfcPrewarmState::Cancelled => f.write_str("Cancelled"),
}
}
}
@@ -102,7 +96,6 @@ pub enum LfcOffloadState {
Failed {
error: String,
},
Skipped,
}
#[derive(Serialize, Debug, Clone, PartialEq)]
@@ -161,6 +154,8 @@ pub enum ComputeStatus {
Empty,
// Compute configuration was requested.
ConfigurationPending,
// Postgres, pgbouncer, and local_proxy is currently being reloaded.
Reloading,
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
@@ -195,6 +190,7 @@ impl Display for ComputeStatus {
match self {
ComputeStatus::Empty => f.write_str("empty"),
ComputeStatus::ConfigurationPending => f.write_str("configuration-pending"),
ComputeStatus::Reloading => f.write_str("reloading"),
ComputeStatus::RefreshConfiguration => f.write_str("refresh-configuration"),
ComputeStatus::RefreshConfigurationPending => {
f.write_str("refresh-configuration-pending")

View File

@@ -341,34 +341,6 @@ extern "C-unwind" fn log_internal(
}
}
/* BEGIN_HADRON */
extern "C" fn reset_safekeeper_statuses_for_metrics(wp: *mut WalProposer, num_safekeepers: u32) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
if api.is_null() {
return;
}
(*api).reset_safekeeper_statuses_for_metrics(&mut (*wp), num_safekeepers);
}
}
extern "C" fn update_safekeeper_status_for_metrics(
wp: *mut WalProposer,
sk_index: u32,
status: u8,
) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
if api.is_null() {
return;
}
(*api).update_safekeeper_status_for_metrics(&mut (*wp), sk_index, status);
}
}
/* END_HADRON */
#[derive(Debug, PartialEq)]
pub enum Level {
Debug5,
@@ -442,10 +414,6 @@ pub(crate) fn create_api() -> walproposer_api {
finish_sync_safekeepers: Some(finish_sync_safekeepers),
process_safekeeper_feedback: Some(process_safekeeper_feedback),
log_internal: Some(log_internal),
/* BEGIN_HADRON */
reset_safekeeper_statuses_for_metrics: Some(reset_safekeeper_statuses_for_metrics),
update_safekeeper_status_for_metrics: Some(update_safekeeper_status_for_metrics),
/* END_HADRON */
}
}
@@ -483,8 +451,6 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
replica_promote: false,
min_ps_feedback: empty_feedback,
wal_rate_limiter: empty_wal_rate_limiter,
num_safekeepers: 0,
safekeeper_status: [0; 32],
}
}

View File

@@ -159,21 +159,6 @@ pub trait ApiImpl {
fn after_election(&self, _wp: &mut WalProposer) {
todo!()
}
/* BEGIN_HADRON */
fn reset_safekeeper_statuses_for_metrics(&self, _wp: &mut WalProposer, _num_safekeepers: u32) {
// Do nothing for testing purposes.
}
fn update_safekeeper_status_for_metrics(
&self,
_wp: &mut WalProposer,
_sk_index: u32,
_status: u8,
) {
// Do nothing for testing purposes.
}
/* END_HADRON */
}
#[derive(Debug)]

View File

@@ -49,7 +49,6 @@
#include "neon.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
#include "neon_utils.h"
#include "pagestore_client.h"
#include "communicator.h"
@@ -674,19 +673,8 @@ lfc_get_state(size_t max_entries)
{
if (GET_STATE(entry, j) != UNAVAILABLE)
{
/* Validate the buffer tag before including it */
BufferTag test_tag = entry->key;
test_tag.blockNum += j;
if (BufferTagIsValid(&test_tag))
{
BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j);
n_pages += 1;
}
else
{
elog(ERROR, "LFC: Skipping invalid buffer tag during cache state capture: blockNum=%u", test_tag.blockNum);
}
BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j);
n_pages += 1;
}
}
if (++i == n_entries)
@@ -695,7 +683,7 @@ lfc_get_state(size_t max_entries)
Assert(i == n_entries);
fcs->n_pages = n_pages;
Assert(pg_popcount((char*)bitmap, ((n_entries << lfc_chunk_size_log) + 7)/8) == n_pages);
elog(LOG, "LFC: save state of %d chunks %d pages (validated)", (int)n_entries, (int)n_pages);
elog(LOG, "LFC: save state of %d chunks %d pages", (int)n_entries, (int)n_pages);
}
LWLockRelease(lfc_lock);
@@ -714,7 +702,6 @@ lfc_prewarm(FileCacheState* fcs, uint32 n_workers)
size_t n_entries;
size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size);
size_t fcs_size;
uint32_t max_prefetch_pages;
dsm_segment *seg;
BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS];
@@ -759,11 +746,6 @@ lfc_prewarm(FileCacheState* fcs, uint32 n_workers)
n_entries = Min(fcs->n_chunks, lfc_prewarm_limit);
Assert(n_entries != 0);
max_prefetch_pages = n_entries << fcs_chunk_size_log;
if (fcs->n_pages > max_prefetch_pages) {
elog(ERROR, "LFC: Number of pages in file cache state (%d) is more than the limit (%d)", fcs->n_pages, max_prefetch_pages);
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* Do not prewarm more entries than LFC limit */
@@ -916,11 +898,6 @@ lfc_prewarm_main(Datum main_arg)
{
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
if (!BufferTagIsValid(&tag)) {
elog(ERROR, "LFC: Invalid buffer tag: %u", tag.blockNum);
}
if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum))
{
(void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);

View File

@@ -391,12 +391,6 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
neon_per_backend_counters totals = {0};
metric_t *metrics;
/* BEGIN_HADRON */
WalproposerShmemState *wp_shmem;
uint32 num_safekeepers;
uint32 num_active_safekeepers;
/* END_HADRON */
/* We put all the tuples into a tuplestore in one go. */
InitMaterializedSRF(fcinfo, 0);
@@ -443,32 +437,11 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
// Not ideal but piggyback our databricks counters into the neon perf counters view
// so that we don't need to introduce neon--1.x+1.sql to add a new view.
{
// Keeping this code in its own block to work around the C90 "don't mix declarations and code" rule when we define
// the `databricks_metrics` array in the next block. Yes, we are seriously dealing with C90 rules in 2025.
// Read safekeeper status from wal proposer shared memory first.
// Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is
// consistent with the active wal acceptors count. Assuming that we don't query this view too often the mutex should
// not be a huge deal.
wp_shmem = GetWalpropShmemState();
SpinLockAcquire(&wp_shmem->mutex);
num_safekeepers = wp_shmem->num_safekeepers;
num_active_safekeepers = 0;
for (int i = 0; i < num_safekeepers; i++) {
if (wp_shmem->safekeeper_status[i] == 1) {
num_active_safekeepers++;
}
}
SpinLockRelease(&wp_shmem->mutex);
}
{
metric_t databricks_metrics[] = {
{"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)},
{"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)},
{"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)},
{"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)},
{"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers},
{"num_configured_safekeepers", false, 0.0, (double) num_safekeepers},
{NULL, false, 0, 0},
};
for (int i = 0; databricks_metrics[i].name != NULL; i++)

View File

@@ -183,22 +183,3 @@ alloc_curl_handle(void)
}
#endif
/*
* Check if a BufferTag is valid by verifying all its fields are not invalid.
*/
bool
BufferTagIsValid(const BufferTag *tag)
{
#if PG_MAJORVERSION_NUM >= 16
return (tag->spcOid != InvalidOid) &&
(tag->relNumber != InvalidRelFileNumber) &&
(tag->forkNum != InvalidForkNumber) &&
(tag->blockNum != InvalidBlockNumber);
#else
return (tag->rnode.spcNode != InvalidOid) &&
(tag->rnode.relNode != InvalidOid) &&
(tag->forkNum != InvalidForkNumber) &&
(tag->blockNum != InvalidBlockNumber);
#endif
}

View File

@@ -2,7 +2,6 @@
#define __NEON_UTILS_H__
#include "lib/stringinfo.h"
#include "storage/buf_internals.h"
#ifndef WALPROPOSER_LIB
#include <curl/curl.h>
@@ -17,9 +16,6 @@ void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
void disable_core_dump(void);
/* Buffer tag validation function */
bool BufferTagIsValid(const BufferTag *tag);
#ifndef WALPROPOSER_LIB
CURL * alloc_curl_handle(void);

View File

@@ -154,9 +154,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
wp->safekeeper[wp->n_safekeepers].state = SS_OFFLINE;
wp->safekeeper[wp->n_safekeepers].active_state = SS_ACTIVE_SEND;
wp->safekeeper[wp->n_safekeepers].wp = wp;
/* BEGIN_HADRON */
wp->safekeeper[wp->n_safekeepers].index = wp->n_safekeepers;
/* END_HADRON */
{
Safekeeper *sk = &wp->safekeeper[wp->n_safekeepers];
int written = 0;
@@ -185,10 +183,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
if (wp->safekeepers_generation > INVALID_GENERATION && wp->config->proto_version < 3)
wp_log(FATAL, "enabling generations requires protocol version 3");
wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version);
/* BEGIN_HADRON */
wp->api.reset_safekeeper_statuses_for_metrics(wp, wp->n_safekeepers);
/* END_HADRON */
/* Fill the greeting package */
wp->greetRequest.pam.tag = 'g';
@@ -361,10 +355,6 @@ ShutdownConnection(Safekeeper *sk)
sk->state = SS_OFFLINE;
sk->streamingAt = InvalidXLogRecPtr;
/* BEGIN_HADRON */
sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 0);
/* END_HADRON */
MembershipConfigurationFree(&sk->greetResponse.mconf);
if (sk->voteResponse.termHistory.entries)
pfree(sk->voteResponse.termHistory.entries);
@@ -1540,10 +1530,6 @@ StartStreaming(Safekeeper *sk)
sk->active_state = SS_ACTIVE_SEND;
sk->streamingAt = sk->startStreamingAt;
/* BEGIN_HADRON */
sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 1);
/* END_HADRON */
/*
* Donors can only be in SS_ACTIVE state, so we potentially update the
* donor when we switch one to SS_ACTIVE.

View File

@@ -432,10 +432,6 @@ typedef struct WalproposerShmemState
/* BEGIN_HADRON */
/* The WAL rate limiter */
WalRateLimiter wal_rate_limiter;
/* Number of safekeepers in the config */
uint32 num_safekeepers;
/* Per-safekeeper status flags: 0=inactive, 1=active */
uint8 safekeeper_status[MAX_SAFEKEEPERS];
/* END_HADRON */
} WalproposerShmemState;
@@ -487,11 +483,6 @@ typedef struct Safekeeper
char const *host;
char const *port;
/* BEGIN_HADRON */
/* index of this safekeeper in the WalProposer array */
uint32 index;
/* END_HADRON */
/*
* connection string for connecting/reconnecting.
*
@@ -740,23 +731,6 @@ typedef struct walproposer_api
* handled by elog().
*/
void (*log_internal) (WalProposer *wp, int level, const char *line);
/*
* BEGIN_HADRON
* APIs manipulating shared memory state used for Safekeeper quorum health metrics.
*/
/*
* Reset the safekeeper statuses in shared memory for metric purposes.
*/
void (*reset_safekeeper_statuses_for_metrics) (WalProposer *wp, uint32 num_safekeepers);
/*
* Update the safekeeper status in shared memory for metric purposes.
*/
void (*update_safekeeper_status_for_metrics) (WalProposer *wp, uint32 sk_index, uint8 status);
/* END_HADRON */
} walproposer_api;
/*

View File

@@ -2261,27 +2261,6 @@ GetNeonCurrentClusterSize(void)
}
uint64 GetNeonCurrentClusterSize(void);
/* BEGIN_HADRON */
static void
walprop_pg_reset_safekeeper_statuses_for_metrics(WalProposer *wp, uint32 num_safekeepers)
{
WalproposerShmemState* shmem = wp->api.get_shmem_state(wp);
SpinLockAcquire(&shmem->mutex);
shmem->num_safekeepers = num_safekeepers;
memset(shmem->safekeeper_status, 0, sizeof(shmem->safekeeper_status));
SpinLockRelease(&shmem->mutex);
}
static void
walprop_pg_update_safekeeper_status_for_metrics(WalProposer *wp, uint32 sk_index, uint8 status)
{
WalproposerShmemState* shmem = wp->api.get_shmem_state(wp);
Assert(sk_index < MAX_SAFEKEEPERS);
SpinLockAcquire(&shmem->mutex);
shmem->safekeeper_status[sk_index] = status;
SpinLockRelease(&shmem->mutex);
}
/* END_HADRON */
static const walproposer_api walprop_pg = {
.get_shmem_state = walprop_pg_get_shmem_state,
@@ -2315,6 +2294,4 @@ static const walproposer_api walprop_pg = {
.finish_sync_safekeepers = walprop_pg_finish_sync_safekeepers,
.process_safekeeper_feedback = walprop_pg_process_safekeeper_feedback,
.log_internal = walprop_pg_log_internal,
.reset_safekeeper_statuses_for_metrics = walprop_pg_reset_safekeeper_statuses_for_metrics,
.update_safekeeper_status_for_metrics = walprop_pg_update_safekeeper_status_for_metrics,
};

View File

@@ -700,10 +700,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
ip_allowlist_check_enabled: !args.is_private_access_proxy,
is_vpc_acccess_proxy: args.is_private_access_proxy,
is_auth_broker: args.is_auth_broker,
#[cfg(not(feature = "rest_broker"))]
accept_jwts: args.is_auth_broker,
#[cfg(feature = "rest_broker")]
accept_jwts: args.is_auth_broker || args.is_rest_broker,
console_redirect_confirmation_timeout: args.webauth_confirmation_timeout,
};

View File

@@ -458,7 +458,7 @@ pub(crate) enum LocalProxyConnError {
impl ReportableError for HttpConnError {
fn get_error_kind(&self) -> ErrorKind {
match self {
HttpConnError::ConnectError(e) => e.get_error_kind(),
HttpConnError::ConnectError(_) => ErrorKind::Compute,
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
HttpConnError::PostgresConnectionError(p) => match p.as_db_error() {
// user provided a wrong database name

View File

@@ -612,25 +612,19 @@ pub async fn handle_request(
}
}
let max_term = statuses
.iter()
.map(|(status, _)| status.acceptor_state.term)
.max()
.unwrap();
// Find the most advanced safekeeper
let (status, i) = statuses
.into_iter()
.max_by_key(|(status, _)| {
(
status.acceptor_state.epoch,
status.flush_lsn,
/* BEGIN_HADRON */
// We need to pull from the SK with the highest term.
// This is because another compute may come online and vote the same highest term again on the other two SKs.
// Then, there will be 2 computes running on the same term.
status.acceptor_state.term,
/* END_HADRON */
status.flush_lsn,
status.commit_lsn,
)
})
@@ -640,22 +634,6 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
// TODO(diko): This is hadron only check to make sure that we pull the timeline
// from the safekeeper with the highest term during timeline restore.
// We could avoid returning the error by calling bump_term after pull_timeline.
// However, this is not a big deal because we retry the pull_timeline requests.
// The check should be removed together with removing custom hadron logic for
// safekeeper restore.
if wait_for_peer_timeline_status && status.acceptor_state.term != max_term {
return Err(ApiError::PreconditionFailed(
format!(
"choosen safekeeper {} has term {}, but the most advanced term is {}",
safekeeper_host, status.acceptor_state.term, max_term
)
.into(),
));
}
match pull_timeline(
status,
safekeeper_host,

View File

@@ -195,14 +195,12 @@ impl StateSK {
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
let result = self.state_mut().membership_switch(to).await?;
let flush_lsn = self.flush_lsn();
let last_log_term = self.state().acceptor_state.get_last_log_term(flush_lsn);
Ok(TimelineMembershipSwitchResponse {
previous_conf: result.previous_conf,
current_conf: result.current_conf,
last_log_term,
flush_lsn,
last_log_term: self.state().acceptor_state.term,
flush_lsn: self.flush_lsn(),
})
}

View File

@@ -471,17 +471,11 @@ impl Persistence {
&self,
input_node_id: NodeId,
input_https_port: Option<u16>,
input_grpc_addr: Option<String>,
input_grpc_port: Option<u16>,
) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
self.update_node(
input_node_id,
(
listen_https_port.eq(input_https_port.map(|x| x as i32)),
listen_grpc_addr.eq(input_grpc_addr),
listen_grpc_port.eq(input_grpc_port.map(|x| x as i32)),
),
listen_https_port.eq(input_https_port.map(|x| x as i32)),
)
.await
}

View File

@@ -7813,7 +7813,7 @@ impl Service {
register_req.listen_https_port,
register_req.listen_pg_addr,
register_req.listen_pg_port,
register_req.listen_grpc_addr.clone(),
register_req.listen_grpc_addr,
register_req.listen_grpc_port,
register_req.availability_zone_id.clone(),
self.config.use_https_pageserver_api,
@@ -7848,8 +7848,6 @@ impl Service {
.update_node_on_registration(
register_req.node_id,
register_req.listen_https_port,
register_req.listen_grpc_addr,
register_req.listen_grpc_port,
)
.await?
}

View File

@@ -24,12 +24,12 @@ use pageserver_api::controller_api::{
};
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::PgVersionId;
use safekeeper_api::Term;
use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
use safekeeper_api::models::{
PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
TimelineMembershipSwitchResponse,
};
use safekeeper_api::{INITIAL_TERM, Term};
use safekeeper_client::mgmt_api;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -1298,7 +1298,13 @@ impl Service {
)
.await?;
let sync_position = Self::get_sync_position(&results)?;
let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
for res in results.into_iter().flatten() {
let sk_position = (res.last_log_term, res.flush_lsn);
if sync_position < sk_position {
sync_position = sk_position;
}
}
tracing::info!(
%generation,
@@ -1592,36 +1598,4 @@ impl Service {
Ok(())
}
/// Get membership switch responses from all safekeepers and return the sync position.
///
/// Sync position is a position equal or greater than the commit position.
/// It is guaranteed that all WAL entries with (last_log_term, flush_lsn)
/// greater than the sync position are not committed (= not on a quorum).
///
/// Returns error if there is no quorum of successful responses.
fn get_sync_position(
responses: &[mgmt_api::Result<TimelineMembershipSwitchResponse>],
) -> Result<(Term, Lsn), ApiError> {
let quorum_size = responses.len() / 2 + 1;
let mut wal_positions = responses
.iter()
.flatten()
.map(|res| (res.last_log_term, res.flush_lsn))
.collect::<Vec<_>>();
// Should be already checked if the responses are from tenant_timeline_set_membership_quorum.
if wal_positions.len() < quorum_size {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"not enough successful responses to get sync position: {}/{}",
wal_positions.len(),
quorum_size,
)));
}
wal_positions.sort();
Ok(wal_positions[quorum_size - 1])
}
}

View File

@@ -78,26 +78,20 @@ class EndpointHttpClient(requests.Session):
json: dict[str, str] = res.json()
return json
def prewarm_lfc(self, from_endpoint_id: str | None = None) -> dict[str, str]:
def prewarm_lfc(self, from_endpoint_id: str | None = None):
"""
Prewarm LFC cache from given endpoint and wait till it finishes or errors
"""
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
self.post(self.prewarm_url, params=params).raise_for_status()
return self.prewarm_lfc_wait()
self.prewarm_lfc_wait()
def cancel_prewarm_lfc(self):
"""
Cancel LFC prewarm if any is ongoing
"""
self.delete(self.prewarm_url).raise_for_status()
def prewarm_lfc_wait(self) -> dict[str, str]:
def prewarm_lfc_wait(self):
"""
Wait till LFC prewarm returns with error or success.
If prewarm was not requested before calling this function, it will error
"""
statuses = "failed", "completed", "skipped", "cancelled"
statuses = "failed", "completed", "skipped"
def prewarmed():
json = self.prewarm_lfc_status()
@@ -107,7 +101,6 @@ class EndpointHttpClient(requests.Session):
wait_until(prewarmed, timeout=60)
res = self.prewarm_lfc_status()
assert res["status"] != "failed", res
return res
def offload_lfc_status(self) -> dict[str, str]:
res = self.get(self.offload_url)
@@ -115,31 +108,29 @@ class EndpointHttpClient(requests.Session):
json: dict[str, str] = res.json()
return json
def offload_lfc(self) -> dict[str, str]:
def offload_lfc(self):
"""
Offload LFC cache to endpoint storage and wait till offload finishes or errors
"""
self.post(self.offload_url).raise_for_status()
return self.offload_lfc_wait()
self.offload_lfc_wait()
def offload_lfc_wait(self) -> dict[str, str]:
def offload_lfc_wait(self):
"""
Wait till LFC offload returns with error or success.
If offload was not requested before calling this function, it will error
"""
statuses = "failed", "completed", "skipped"
def offloaded():
json = self.offload_lfc_status()
status, err = json["status"], json.get("error")
assert status in statuses, f"{status}, {err=}"
assert status in ["failed", "completed"], f"{status}, {err=}"
wait_until(offloaded, timeout=60)
res = self.offload_lfc_status()
assert res["status"] != "failed", res
return res
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False) -> dict[str, str]:
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False):
url = f"http://localhost:{self.external_port}/promote"
if disconnect:
try: # send first request to start promote and disconnect

View File

@@ -79,7 +79,6 @@ class NeonAPI:
elif resp.status_code == 423 and resp.json()["message"] in {
"endpoint is in some transitive state, could not suspend",
"project already has running conflicting operations, scheduling of new ones is prohibited",
"snapshot is in transition",
}:
retry = True
self.retries4xx += 1
@@ -106,7 +105,6 @@ class NeonAPI:
branch_name: str | None = None,
branch_role_name: str | None = None,
branch_database_name: str | None = None,
project_settings: dict[str, Any] | None = None,
) -> dict[str, Any]:
data: dict[str, Any] = {
"project": {
@@ -123,8 +121,6 @@ class NeonAPI:
data["project"]["branch"]["role_name"] = branch_role_name
if branch_database_name:
data["project"]["branch"]["database_name"] = branch_database_name
if project_settings:
data["project"]["settings"] = project_settings
resp = self.__request(
"POST",
@@ -359,63 +355,6 @@ class NeonAPI:
return cast("dict[str, Any]", resp.json())
def create_snapshot(
self,
project_id: str,
branch_id: str,
lsn: str | None = None,
timestamp: str | None = None,
name: str | None = None,
expires_at: str | None = None,
) -> dict[str, Any]:
params: dict[str, Any] = {
"lsn": lsn,
"timestamp": timestamp,
"name": name,
"expires_at": expires_at,
}
params = {key: value for key, value in params.items() if value is not None}
resp = self.__request(
"POST",
f"/projects/{project_id}/branches/{branch_id}/snapshot",
params=params,
json={},
headers={
"Accept": "application/json",
},
)
return cast("dict[str, Any]", resp.json())
def delete_snapshot(self, project_id: str, snapshot_id: str) -> dict[str, Any]:
resp = self.__request("DELETE", f"/projects/{project_id}/snapshots/{snapshot_id}")
return cast("dict[str, Any]", resp.json())
def restore_snapshot(
self,
project_id: str,
snapshot_id: str,
target_branch_id: str,
name: str | None = None,
finalize_restore: bool = True,
) -> dict[str, Any]:
data: dict[str, Any] = {
"target_branch_id": target_branch_id,
"finalize_restore": finalize_restore,
}
if name is not None:
data["name"] = name
log.info("Restore snapshot data: %s", data)
resp = self.__request(
"POST",
f"/projects/{project_id}/snapshots/{snapshot_id}/restore",
json=data,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
)
return cast("dict[str, Any]", resp.json())
def delete_endpoint(self, project_id: str, endpoint_id: str) -> dict[str, Any]:
resp = self.__request("DELETE", f"/projects/{project_id}/endpoints/{endpoint_id}")
return cast("dict[str,Any]", resp.json())
@@ -457,14 +396,6 @@ class NeonAPI:
return cast("dict[str, Any]", resp.json())
def get_branch_endpoints(self, project_id: str, branch_id: str) -> dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/branches/{branch_id}/endpoints",
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return cast("dict[str, Any]", resp.json())
def get_endpoints(self, project_id: str) -> dict[str, Any]:
resp = self.__request(
"GET",

View File

@@ -262,6 +262,7 @@ class PgProtocol:
# pooler does not support statement_timeout
# Check if the hostname contains the string 'pooler'
hostname = result.get("host", "")
log.info(f"Hostname: {hostname}")
options = result.get("options", "")
if "statement_timeout" not in options and "pooler" not in hostname:
options = f"-cstatement_timeout=120s {options}"
@@ -505,6 +506,8 @@ class NeonEnvBuilder:
# Flag to use https listener in storage broker, generate local ssl certs,
# and force pageservers and safekeepers to use https for storage broker api.
self.use_https_storage_broker_api: bool = False
# Flag to enable TLS for computes
self.use_compute_tls: bool = False
self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine
self.pageserver_get_vectored_concurrent_io: str | None = (
@@ -1111,14 +1114,16 @@ class NeonEnv:
self.initial_tenant = config.initial_tenant
self.initial_timeline = config.initial_timeline
self.generate_local_ssl_certs = (
self.generate_compute_tls_certs = config.use_compute_tls
self.generate_local_tls_certs = (
config.use_https_pageserver_api
or config.use_https_safekeeper_api
or config.use_https_storage_controller_api
or config.use_https_storage_broker_api
or config.use_compute_tls
)
self.ssl_ca_file = (
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None
self.tls_ca_file = (
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_tls_certs else None
)
neon_local_env_vars = {}
@@ -1197,7 +1202,8 @@ class NeonEnv:
"endpoint_storage": {
"listen_addr": f"127.0.0.1:{self.port_distributor.get_port()}",
},
"generate_local_ssl_certs": self.generate_local_ssl_certs,
"generate_local_tls_certs": self.generate_local_tls_certs,
"generate_compute_tls_certs": self.generate_compute_tls_certs,
}
if config.use_https_storage_broker_api:
@@ -1941,7 +1947,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.auth_enabled = auth_enabled
self.allowed_errors: list[str] = DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS
self.logfile = self.env.repo_dir / "storage_controller_1" / "storage_controller.log"
self.ssl_ca_file = env.ssl_ca_file
self.tls_ca_file = env.tls_ca_file
def start(
self,
@@ -2014,8 +2020,8 @@ class NeonStorageController(MetricsGetter, LogUtils):
return PageserverHttpClient(self.port, lambda: True, auth_token, *args, **kwargs)
def request(self, method, *args, **kwargs) -> requests.Response:
if self.ssl_ca_file is not None:
kwargs["verify"] = self.ssl_ca_file
if self.tls_ca_file is not None:
kwargs["verify"] = self.tls_ca_file
resp = requests.request(method, *args, **kwargs)
NeonStorageController.raise_api_exception(resp)
@@ -2313,7 +2319,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
timeline_id: TimelineId,
new_sk_set: list[int],
):
log.info(f"migrate_safekeepers({tenant_id}, {timeline_id}, {new_sk_set})")
response = self.request(
"POST",
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate",
@@ -5280,32 +5285,16 @@ class EndpointFactory:
)
def stop_all(self, fail_on_error=True) -> Self:
"""
Stop all the endpoints in parallel.
"""
# Note: raising an exception from a task in a task group cancels
# all the other tasks. We don't want that, hence the 'stop_one'
# function catches exceptions and puts them on the 'exceptions'
# list for later processing.
exceptions = []
async def stop_one(ep):
exception = None
for ep in self.endpoints:
try:
await asyncio.to_thread(ep.stop)
ep.stop()
except Exception as e:
log.error(f"Failed to stop endpoint {ep.endpoint_id}: {e}")
exceptions.append(e)
exception = e
async def async_stop_all():
async with asyncio.TaskGroup() as tg:
for ep in self.endpoints:
tg.create_task(stop_one(ep))
asyncio.run(async_stop_all())
if fail_on_error and exceptions:
raise ExceptionGroup("stopping an endpoint failed", exceptions)
if fail_on_error and exception is not None:
raise exception
return self

View File

@@ -11,7 +11,6 @@ import time
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING, Any
import psycopg2
import pytest
from fixtures.log_helper import log
@@ -23,29 +22,6 @@ if TYPE_CHECKING:
from fixtures.pg_version import PgVersion
class NeonSnapshot:
"""
A snapshot of the Neon Branch
Gets the output of the API call af a snapshot creation
"""
def __init__(self, project: NeonProject, snapshot: dict[str, Any]):
self.project: NeonProject = project
snapshot = snapshot["snapshot"]
self.id: str = snapshot["id"]
self.name: str = snapshot["name"]
self.created_at: datetime = datetime.fromisoformat(snapshot["created_at"])
self.source_branch: NeonBranch = project.branches[snapshot["source_branch_id"]]
project.snapshots[self.id] = self
self.restored: bool = False
def __str__(self) -> str:
return f"id: {self.id}, name: {self.name}, created_at: {self.created_at}"
def delete(self) -> None:
self.project.delete_snapshot(self.id)
class NeonEndpoint:
"""
Neon Endpoint
@@ -91,21 +67,9 @@ class NeonBranch:
is_reset defines if the branch is a reset one i.e. created as a result of the reset API Call
"""
def __init__(
self,
project,
branch: dict[str, Any],
is_reset=False,
primary_branch: NeonBranch | None = None,
):
def __init__(self, project, branch: dict[str, Any], is_reset=False):
self.id: str = branch["branch"]["id"]
self.desc = branch
self.name: str | None = None
if "name" in branch["branch"]:
self.name = branch["branch"]["name"]
self.restored_from: str | None = None
if "restored_from" in branch["branch"]:
self.restored_from = branch["branch"]["restored_from"]
self.project: NeonProject = project
self.neon_api: NeonAPI = project.neon_api
self.project_id: str = branch["branch"]["project_id"]
@@ -146,36 +110,13 @@ class NeonBranch:
"PGPASSWORD": self.connection_parameters["password"],
"PGSSLMODE": "require",
}
self.replicas: dict[str, NeonBranch] = {}
self.primary_branch: NeonBranch | None = primary_branch
if primary_branch:
if not self.connection_parameters:
raise ValueError(
"connection_parameters is required when primary_branch is specified"
)
self.project.replicas[self.id] = self
primary_branch.replicas[self.id] = self
with psycopg2.connect(primary_branch.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(f"CREATE PUBLICATION {self.id} FOR ALL TABLES")
conn.commit()
with psycopg2.connect(self.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(
f"CREATE SUBSCRIPTION {self.id} CONNECTION '{primary_branch.connstr()}' PUBLICATION {self.id}"
)
conn.commit()
def __str__(self):
"""
Prints the branch's information with all the predecessors
Prints the branch's name with all the predecessors
(r) means the branch is a reset one
"""
name = f"({self.name})" if self.name and self.name != self.id else ""
restored_from = f"(restored_from: {self.restored_from})" if self.restored_from else ""
ancestor = (
f" <- {self.primary_branch}" if self.primary_branch else f", parent: {self.parent}"
)
return f"{self.id}{name}{restored_from}{ancestor}"
return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}"
def random_time(self) -> datetime:
min_time = max(
@@ -187,10 +128,8 @@ class NeonBranch:
log.info("min_time: %s, max_time: %s", min_time, max_time)
return (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)
def create_child_branch(
self, parent_timestamp: datetime | None = None, primary_branch: NeonBranch | None = None
) -> NeonBranch | None:
return self.project.create_branch(self.id, parent_timestamp, primary_branch=primary_branch)
def create_child_branch(self, parent_timestamp: datetime | None = None) -> NeonBranch | None:
return self.project.create_branch(self.id, parent_timestamp)
def create_ro_endpoint(self) -> NeonEndpoint | None:
if not self.project.check_limit_endpoints():
@@ -213,9 +152,6 @@ class NeonBranch:
self.project.terminate_benchmark(self.id)
def reset_to_parent(self) -> None:
"""
Resets the branch to the parent branch
"""
for ep in self.project.endpoints.values():
if ep.type == "read_only":
ep.terminate_benchmark()
@@ -281,19 +217,6 @@ class NeonBranch:
ep.start_benchmark()
return res
def create_logical_replica(self) -> NeonBranch | None:
if self.primary_branch is not None:
raise RuntimeError("The primary branch cannot be a logical replica")
if self.id in self.project.reset_branches:
raise RuntimeError("Reset branch cannot be a primary branch")
replica = self.create_child_branch(primary_branch=self)
return replica
def connstr(self):
if self.connection_parameters is None:
raise RuntimeError("Connection parameters are not defined")
return " ".join([f"{key}={value}" for key, value in self.connection_parameters.items()])
class NeonProject:
"""
@@ -305,9 +228,7 @@ class NeonProject:
self.neon_api = neon_api
self.pg_bin = pg_bin
proj = self.neon_api.create_project(
pg_version,
f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}",
project_settings={"enable_logical_replication": True},
pg_version, f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
)
self.id: str = proj["project"]["id"]
self.name: str = proj["project"]["name"]
@@ -319,7 +240,6 @@ class NeonProject:
# Leaf branches are the branches, which do not have children
self.leaf_branches: dict[str, NeonBranch] = {}
self.branches: dict[str, NeonBranch] = {}
self.branch_num: int = 0
self.reset_branches: set[str] = set()
self.main_branch: NeonBranch = NeonBranch(self, proj)
self.main_branch.connection_parameters = self.connection_parameters
@@ -333,9 +253,6 @@ class NeonProject:
self.limits: dict[str, Any] = self.get_limits()["limits"]
self.read_only_endpoints_total: int = 0
self.min_time: datetime = datetime.now(UTC)
self.snapshots: dict[str, NeonSnapshot] = {}
self.snapshot_num: int = 0
self.replicas: dict[str, NeonBranch] = {}
def get_limits(self) -> dict[str, Any]:
return self.neon_api.get_project_limits(self.id)
@@ -363,11 +280,7 @@ class NeonProject:
return False
def create_branch(
self,
parent_id: str | None = None,
parent_timestamp: datetime | None = None,
is_reset: bool = False,
primary_branch: NeonBranch | None = None,
self, parent_id: str | None = None, parent_timestamp: datetime | None = None
) -> NeonBranch | None:
self.wait()
if not self.check_limit_branches():
@@ -380,14 +293,14 @@ class NeonProject:
branch_def = self.neon_api.create_branch(
self.id, parent_id=parent_id, parent_timestamp=parent_timestamp_str
)
new_branch = NeonBranch(self, branch_def, is_reset, primary_branch)
new_branch = NeonBranch(self, branch_def)
self.wait()
return new_branch
def delete_branch(self, branch_id: str) -> None:
parent = self.branches[branch_id].parent
if not parent or branch_id == self.main_branch.id:
raise RuntimeError("Cannot delete the main branch or a branch restored from a snapshot")
raise RuntimeError("Cannot delete the main branch")
if branch_id not in self.leaf_branches and branch_id not in self.reset_branches:
raise RuntimeError(f"The branch {branch_id}, probably, has ancestors")
if branch_id not in self.branches:
@@ -400,18 +313,7 @@ class NeonProject:
if branch_id not in self.reset_branches:
self.terminate_benchmark(branch_id)
self.neon_api.delete_branch(self.id, branch_id)
primary_branch = self.branches[branch_id].primary_branch
if primary_branch is not None:
with psycopg2.connect(primary_branch.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(f"DROP PUBLICATION {branch_id}")
conn.commit()
parent.replicas.pop(branch_id)
self.replicas.pop(branch_id)
else:
for replica in self.branches[branch_id].replicas.values():
replica.delete()
if len(parent.children) == 1 and parent.parent is not None:
if len(parent.children) == 1 and parent.id != self.main_branch.id:
self.leaf_branches[parent.id] = parent
parent.children.pop(branch_id)
if branch_id in self.leaf_branches:
@@ -431,26 +333,6 @@ class NeonProject:
log.info("No leaf branches found")
return target
def get_random_parent_branch(self) -> NeonBranch:
return self.branches[
random.choice(
list(set(self.branches.keys()) - self.reset_branches - set(self.replicas.keys()))
)
]
def gen_branch_name(self) -> str:
self.branch_num += 1
return f"branch{self.branch_num}"
def get_random_snapshot(self) -> NeonSnapshot | None:
snapshot: NeonSnapshot | None = None
avail_snapshots = [sn for sn in self.snapshots.values() if not sn.restored]
if avail_snapshots:
snapshot = random.choice(avail_snapshots)
else:
log.info("No snapshots found")
return snapshot
def delete_endpoint(self, endpoint_id: str) -> None:
self.terminate_benchmark(endpoint_id)
self.neon_api.delete_endpoint(self.id, endpoint_id)
@@ -527,116 +409,6 @@ class NeonProject:
self.restore_num += 1
return f"restore{self.restore_num}"
def gen_snapshot_name(self) -> str:
self.snapshot_num += 1
return f"snapshot{self.snapshot_num}"
def create_snapshot(
self,
lsn: str | None = None,
timestamp: datetime | None = None,
) -> NeonSnapshot:
"""
Create a new Neon snapshot for the current project
Two optional arguments: lsn and timestamp are mutually exclusive
they instruct to create a snapshot with the specific lns or timestamp
"""
snapshot_name = self.gen_snapshot_name()
with psycopg2.connect(self.connection_uri) as conn:
with conn.cursor() as cur:
# We will check the value we set now after the snapshot restored to verify consistency
cur.execute(
f"INSERT INTO sanity_check (name, value) VALUES "
f"('snapsot_name', '{snapshot_name}') ON CONFLICT (name) DO UPDATE SET value = EXCLUDED.value"
)
conn.commit()
snapshot = NeonSnapshot(
self,
self.neon_api.create_snapshot(
self.id,
self.main_branch.id,
lsn,
timestamp.isoformat().replace("+00:00", "Z") if timestamp else None,
snapshot_name,
),
)
self.wait()
# Now we taint the value after the snapshot was taken
cur.execute("UPDATE sanity_check SET value = 'tainted' || value")
conn.commit()
return snapshot
def delete_snapshot(self, snapshot_id: str) -> None:
"""
Deletes the snapshot with the given id
"""
self.wait()
self.neon_api.delete_snapshot(self.id, snapshot_id)
self.snapshots.pop(snapshot_id)
self.wait()
def restore_snapshot(self, snapshot_id: str) -> NeonBranch | None:
"""
Creates a new Neon branch for the current project, then restores the snapshot
with the given id
"""
target_branch = self.get_random_parent_branch().create_child_branch()
if not target_branch:
return None
self.snapshots[snapshot_id].restored = True
new_branch_def: dict[str, Any] = self.neon_api.restore_snapshot(
self.id,
snapshot_id,
target_branch.id,
self.gen_branch_name(),
)
self.wait()
new_branch_def = self.neon_api.get_branch_details(self.id, new_branch_def["branch"]["id"])
# The restored branch will lose the parent afterward, but it has it during the restoration.
# So, we delete parent_id
new_branch_def["branch"].pop("parent_id")
new_branch = NeonBranch(self, new_branch_def)
log.info("Restored snapshot to the branch: %s", new_branch)
target_branch_def = self.neon_api.get_branch_details(self.id, target_branch.id)
if "name" in target_branch_def["branch"]:
target_branch.name = target_branch_def["branch"]["name"]
if new_branch.connection_parameters is None:
if not new_branch.endpoints:
for ep in self.neon_api.get_branch_endpoints(self.id, new_branch.id)["endpoints"]:
if ep["id"] not in self.endpoints:
NeonEndpoint(self, ep)
new_branch.connection_parameters = self.connection_parameters.copy()
for ep in new_branch.endpoints.values():
if ep.type == "read_write":
new_branch.connection_parameters["host"] = ep.host
break
new_branch.connect_env = {
"PGHOST": new_branch.connection_parameters["host"],
"PGUSER": new_branch.connection_parameters["role"],
"PGDATABASE": new_branch.connection_parameters["database"],
"PGPASSWORD": new_branch.connection_parameters["password"],
"PGSSLMODE": "require",
}
with psycopg2.connect(
host=new_branch.connection_parameters["host"],
port=5432,
user=new_branch.connection_parameters["role"],
password=new_branch.connection_parameters["password"],
database=new_branch.connection_parameters["database"],
) as conn:
with conn.cursor() as cur:
cur.execute("SELECT value FROM sanity_check WHERE name = 'snapsot_name'")
snapshot_name = None
if row := cur.fetchone():
snapshot_name = row[0]
# We verify here that the value we select from the table matches with the snapshot name
# To ensure consistency
assert snapshot_name == self.snapshots[snapshot_id].name
self.wait()
target_branch.start_benchmark()
new_branch.start_benchmark()
return new_branch
@pytest.fixture()
def setup_class(
@@ -666,7 +438,9 @@ def do_action(project: NeonProject, action: str) -> bool:
if action == "new_branch" or action == "new_branch_random_time":
use_random_time: bool = action == "new_branch_random_time"
log.info("Trying to create a new branch %s", "random time" if use_random_time else "")
parent = project.get_random_parent_branch()
parent = project.branches[
random.choice(list(set(project.branches.keys()) - project.reset_branches))
]
child = parent.create_child_branch(parent.random_time() if use_random_time else None)
if child is None:
return False
@@ -705,31 +479,6 @@ def do_action(project: NeonProject, action: str) -> bool:
return False
log.info("Reset to parent %s", target)
target.reset_to_parent()
elif action == "create_snapshot":
snapshot = project.create_snapshot()
if snapshot is None:
return False
log.info("Created snapshot %s", snapshot)
elif action == "restore_snapshot":
if (snapshot_to_restore := project.get_random_snapshot()) is None:
return False
log.info("Restoring snapshot %s", snapshot_to_restore)
if project.restore_snapshot(snapshot_to_restore.id) is None:
return False
elif action == "delete_snapshot":
snapshot_to_delete = project.get_random_snapshot()
if snapshot_to_delete is None:
return False
snapshot_to_delete.delete()
log.info("Deleted snapshot %s", snapshot_to_delete)
elif action == "create_logical_replica":
primary: NeonBranch | None = project.get_random_parent_branch()
if primary is None:
return False
replica: NeonBranch | None = primary.create_logical_replica()
if replica is None:
return False
log.info("Created logical replica %s", replica)
else:
raise ValueError(f"The action {action} is unknown")
return True
@@ -763,28 +512,12 @@ def test_api_random(
("delete_branch", 1.2),
("restore_random_time", 0.9),
("reset_to_parent", 0.3),
("create_snapshot", 0.2),
("restore_snapshot", 0.1),
("delete_snapshot", 0.1),
)
if num_ops_env := os.getenv("NUM_OPERATIONS"):
num_operations = int(num_ops_env)
else:
num_operations = 250
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
# Create a table for sanity check
# We are going to leve some control values there to check, e.g., after restoring a snapshot
pg_bin.run(
[
"psql",
"-c",
"CREATE TABLE IF NOT EXISTS sanity_check (name VARCHAR NOT NULL PRIMARY KEY, value VARCHAR)",
],
env=project.main_branch.connect_env,
)
# To not go to the past where pgbench tables do not exist
time.sleep(1)
project.min_time = datetime.now(UTC)
# To not go to the past where pgbench tables do not exist
time.sleep(1)
project.min_time = datetime.now(UTC)

View File

@@ -863,6 +863,7 @@ def test_pageserver_compaction_circuit_breaker(neon_env_builder: NeonEnvBuilder)
assert not env.pageserver.log_contains(".*Circuit breaker failure ended.*")
@pytest.mark.skip(reason="Lakebase mode")
def test_ps_corruption_detection_feedback(neon_env_builder: NeonEnvBuilder):
"""
Test that when the pageserver detects corruption during image layer creation,
@@ -889,9 +890,7 @@ def test_ps_corruption_detection_feedback(neon_env_builder: NeonEnvBuilder):
timeline_id = env.initial_timeline
pageserver_http = env.pageserver.http_client()
workload = Workload(
env, tenant_id, timeline_id, endpoint_opts={"config_lines": ["neon.lakebase_mode=true"]}
)
workload = Workload(env, tenant_id, timeline_id)
workload.init()
# Enable the failpoint that will cause image layer creation to fail due to a (simulated) detected

View File

@@ -1,6 +1,6 @@
import random
import threading
from enum import StrEnum
from threading import Thread
from time import sleep
from typing import Any
@@ -47,23 +47,19 @@ def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor)
# With autoprewarm, we need to be sure LFC was offloaded after all writes
# finish, so we sleep. Otherwise we'll have less prewarmed pages than we want
sleep(AUTOOFFLOAD_INTERVAL_SECS)
offload_res = client.offload_lfc_wait()
log.info(offload_res)
return offload_res
client.offload_lfc_wait()
return
if method == PrewarmMethod.COMPUTE_CTL:
status = client.prewarm_lfc_status()
assert status["status"] == "not_prewarmed"
assert "error" not in status
offload_res = client.offload_lfc()
log.info(offload_res)
client.offload_lfc()
assert client.prewarm_lfc_status()["status"] == "not_prewarmed"
parsed = prom_parse(client)
desired = {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0, OFFLOAD_ERR_LABEL: 0, PREWARM_ERR_LABEL: 0}
assert parsed == desired, f"{parsed=} != {desired=}"
return offload_res
return
raise AssertionError(f"{method} not in PrewarmMethod")
@@ -72,30 +68,21 @@ def prewarm_endpoint(
method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor, lfc_state: str | None
):
if method == PrewarmMethod.AUTOPREWARM:
prewarm_res = client.prewarm_lfc_wait()
log.info(prewarm_res)
client.prewarm_lfc_wait()
elif method == PrewarmMethod.COMPUTE_CTL:
prewarm_res = client.prewarm_lfc()
log.info(prewarm_res)
return prewarm_res
client.prewarm_lfc()
elif method == PrewarmMethod.POSTGRES:
cur.execute("select neon.prewarm_local_cache(%s)", (lfc_state,))
def check_prewarmed_contains(
def check_prewarmed(
method: PrewarmMethod, client: EndpointHttpClient, desired_status: dict[str, str | int]
):
if method == PrewarmMethod.AUTOPREWARM:
prewarm_status = client.prewarm_lfc_status()
for k in desired_status:
assert desired_status[k] == prewarm_status[k]
assert client.prewarm_lfc_status() == desired_status
assert prom_parse(client)[PREWARM_LABEL] == 1
elif method == PrewarmMethod.COMPUTE_CTL:
prewarm_status = client.prewarm_lfc_status()
for k in desired_status:
assert desired_status[k] == prewarm_status[k]
assert client.prewarm_lfc_status() == desired_status
desired = {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1, PREWARM_ERR_LABEL: 0, OFFLOAD_ERR_LABEL: 0}
assert prom_parse(client) == desired
@@ -162,6 +149,9 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
log.info(f"Used LFC size: {lfc_used_pages}")
pg_cur.execute("select * from neon.get_prewarm_info()")
total, prewarmed, skipped, _ = pg_cur.fetchall()[0]
log.info(f"Prewarm info: {total=} {prewarmed=} {skipped=}")
progress = (prewarmed + skipped) * 100 // total
log.info(f"Prewarm progress: {progress}%")
assert lfc_used_pages > 10000
assert total > 0
assert prewarmed > 0
@@ -171,54 +161,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped}
check_prewarmed_contains(method, client, desired)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prewarm_cancel(neon_simple_env: NeonEnv):
"""
Test we can cancel LFC prewarm and prewarm successfully after
"""
env = neon_simple_env
n_records = 1000000
cfg = [
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
]
endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg)
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create schema neon; create extension neon with schema neon")
pg_cur.execute("create database lfc")
lfc_conn = endpoint.connect(dbname="lfc")
lfc_cur = lfc_conn.cursor()
log.info(f"Inserting {n_records} rows")
lfc_cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
lfc_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
log.info(f"Inserted {n_records} rows")
client = endpoint.http_client()
method = PrewarmMethod.COMPUTE_CTL
offload_lfc(method, client, pg_cur)
endpoint.stop()
endpoint.start()
thread = Thread(target=lambda: prewarm_endpoint(method, client, pg_cur, None))
thread.start()
# wait 2 seconds to ensure we cancel prewarm SQL query
sleep(2)
client.cancel_prewarm_lfc()
thread.join()
assert client.prewarm_lfc_status()["status"] == "cancelled"
prewarm_endpoint(method, client, pg_cur, None)
assert client.prewarm_lfc_status()["status"] == "completed"
check_prewarmed(method, client, desired)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
@@ -235,8 +178,9 @@ def test_lfc_prewarm_empty(neon_simple_env: NeonEnv):
cur = conn.cursor()
cur.execute("create schema neon; create extension neon with schema neon")
method = PrewarmMethod.COMPUTE_CTL
assert offload_lfc(method, client, cur)["status"] == "skipped"
assert prewarm_endpoint(method, client, cur, None)["status"] == "skipped"
offload_lfc(method, client, cur)
prewarm_endpoint(method, client, cur, None)
assert client.prewarm_lfc_status()["status"] == "skipped"
# autoprewarm isn't needed as we prewarm manually
@@ -307,11 +251,11 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet
workload_threads = []
for _ in range(n_threads):
t = Thread(target=workload)
t = threading.Thread(target=workload)
workload_threads.append(t)
t.start()
prewarm_thread = Thread(target=prewarm)
prewarm_thread = threading.Thread(target=prewarm)
prewarm_thread.start()
def prewarmed():

View File

@@ -286,177 +286,3 @@ def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder):
assert re.match(r".*Timeline .* deleted.*", exc.value.response.text)
# The timeline should remain deleted.
expect_deleted(second_sk)
def test_safekeeper_migration_stale_timeline(neon_env_builder: NeonEnvBuilder):
"""
Test that safekeeper migration handles stale timeline correctly by migrating to
a safekeeper with a stale timeline.
1. Check that we are waiting for the stale timeline to catch up with the commit lsn.
The migration might fail if there is no compute to advance the WAL.
2. Check that we rely on last_log_term (and not the current term) when waiting for the
sync_position on step 7.
3. Check that migration succeeds if the compute is running.
"""
neon_env_builder.num_safekeepers = 2
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 1,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
env.storage_controller.allowed_errors.append(".*not enough successful .* to reach quorum.*")
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
active_sk = env.get_safekeeper(mconf["sk_set"][0])
other_sk = [sk for sk in env.safekeepers if sk.id != active_sk.id][0]
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
ep.start(safekeeper_generation=1, safekeepers=[active_sk.id])
ep.safe_psql("CREATE TABLE t(a int)")
ep.safe_psql("INSERT INTO t VALUES (0)")
# Pull the timeline to other_sk, so other_sk now has a "stale" timeline on it.
other_sk.pull_timeline([active_sk], env.initial_tenant, env.initial_timeline)
# Advance the WAL on active_sk.
ep.safe_psql("INSERT INTO t VALUES (1)")
# The test is more tricky if we have the same last_log_term but different term/flush_lsn.
# Stop the active_sk during the endpoint shutdown because otherwise compute_ctl runs
# sync_safekeepers and advances last_log_term on active_sk.
active_sk.stop()
ep.stop(mode="immediate")
active_sk.start()
active_sk_status = active_sk.http_client().timeline_status(
env.initial_tenant, env.initial_timeline
)
other_sk_status = other_sk.http_client().timeline_status(
env.initial_tenant, env.initial_timeline
)
# other_sk should have the same last_log_term, but a stale flush_lsn.
assert active_sk_status.last_log_term == other_sk_status.last_log_term
assert active_sk_status.flush_lsn > other_sk_status.flush_lsn
commit_lsn = active_sk_status.flush_lsn
# Bump the term on other_sk to make it higher than active_sk.
# This is to make sure we don't use current term instead of last_log_term in the algorithm.
other_sk.http_client().term_bump(
env.initial_tenant, env.initial_timeline, active_sk_status.term + 100
)
# TODO(diko): now it fails because the timeline on other_sk is stale and there is no compute
# to catch up it with active_sk. It might be fixed in https://databricks.atlassian.net/browse/LKB-946
# if we delete stale timelines before starting the migration.
# But the rest of the test is still valid: we should not lose committed WAL after the migration.
with pytest.raises(
StorageControllerApiException, match="not enough successful .* to reach quorum"
):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [other_sk.id]
)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["new_sk_set"] == [other_sk.id]
assert mconf["sk_set"] == [active_sk.id]
assert mconf["generation"] == 2
# Start the endpoint, so it advances the WAL on other_sk.
ep.start(safekeeper_generation=2, safekeepers=[active_sk.id, other_sk.id])
# Now the migration should succeed.
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [other_sk.id]
)
# Check that we didn't lose committed WAL.
assert (
other_sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline).flush_lsn
>= commit_lsn
)
assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]
def test_pull_from_most_advanced_sk(neon_env_builder: NeonEnvBuilder):
"""
Test that we pull the timeline from the most advanced safekeeper during the
migration and do not lose committed WAL.
"""
neon_env_builder.num_safekeepers = 4
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 3,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
sk_set = mconf["sk_set"]
assert len(sk_set) == 3
other_sk = [sk.id for sk in env.safekeepers if sk.id not in sk_set][0]
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
ep.start(safekeeper_generation=1, safekeepers=sk_set)
ep.safe_psql("CREATE TABLE t(a int)")
ep.safe_psql("INSERT INTO t VALUES (0)")
# Stop one sk, so we have a lagging WAL on it.
env.get_safekeeper(sk_set[0]).stop()
# Advance the WAL on the other sks.
ep.safe_psql("INSERT INTO t VALUES (1)")
# Stop other sks to make sure compute_ctl doesn't advance the last_log_term on them during shutdown.
for sk_id in sk_set[1:]:
env.get_safekeeper(sk_id).stop()
ep.stop(mode="immediate")
for sk_id in sk_set:
env.get_safekeeper(sk_id).start()
# Bump the term on the lagging sk to make sure we don't use it to choose the most advanced sk.
env.get_safekeeper(sk_set[0]).http_client().term_bump(
env.initial_tenant, env.initial_timeline, 100
)
def get_commit_lsn(sk_set: list[int]):
flush_lsns = []
last_log_terms = []
for sk_id in sk_set:
sk = env.get_safekeeper(sk_id)
status = sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline)
flush_lsns.append(status.flush_lsn)
last_log_terms.append(status.last_log_term)
# In this test we assume that all sks have the same last_log_term.
assert len(set(last_log_terms)) == 1
flush_lsns.sort(reverse=True)
commit_lsn = flush_lsns[len(sk_set) // 2]
log.info(f"sk_set: {sk_set}, flush_lsns: {flush_lsns}, commit_lsn: {commit_lsn}")
return commit_lsn
commit_lsn_before_migration = get_commit_lsn(sk_set)
# Make two migrations, so the lagging sk stays in the sk_set, but other sks are replaced.
new_sk_set1 = [sk_set[0], sk_set[1], other_sk] # remove sk_set[2], add other_sk
new_sk_set2 = [sk_set[0], other_sk, sk_set[2]] # remove sk_set[1], add sk_set[2] back
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, new_sk_set1
)
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, new_sk_set2
)
commit_lsn_after_migration = get_commit_lsn(new_sk_set2)
# We should not lose committed WAL.
# If we have choosen the lagging sk to pull the timeline from, this might fail.
assert commit_lsn_before_migration <= commit_lsn_after_migration
ep.start(safekeeper_generation=5, safekeepers=new_sk_set2)
assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]

View File

@@ -19,7 +19,7 @@ def test_pageserver_https_api(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
addr = f"https://localhost:{env.pageserver.service_port.https}/v1/status"
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
def test_safekeeper_https_api(neon_env_builder: NeonEnvBuilder):
@@ -37,7 +37,7 @@ def test_safekeeper_https_api(neon_env_builder: NeonEnvBuilder):
# 1. Make simple https request.
addr = f"https://localhost:{sk.port.https}/v1/status"
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
# Note: http_port is intentionally wrong.
# Storcon should not use it if use_https is on.
@@ -83,7 +83,7 @@ def test_storage_controller_https_api(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
addr = f"https://localhost:{env.storage_controller.port}/status"
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
def test_certificate_rotation(neon_env_builder: NeonEnvBuilder):
@@ -111,7 +111,7 @@ def test_certificate_rotation(neon_env_builder: NeonEnvBuilder):
# 1. Check if https works.
addr = f"https://localhost:{port}/v1/status"
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
ps_cert_path = env.pageserver.workdir / "server.crt"
ps_key_path = env.pageserver.workdir / "server.key"
@@ -136,7 +136,7 @@ def test_certificate_rotation(neon_env_builder: NeonEnvBuilder):
wait_until(error_reloading_cert)
# 4. Check that it uses old cert.
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
cur_cert = ssl.get_server_certificate(("localhost", port))
assert cur_cert == ps_cert
@@ -150,7 +150,7 @@ def test_certificate_rotation(neon_env_builder: NeonEnvBuilder):
wait_until(cert_reloaded)
# 6. Check that server returns new cert.
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
cur_cert = ssl.get_server_certificate(("localhost", port))
assert cur_cert == sk_cert
@@ -174,7 +174,7 @@ def test_server_and_cert_metrics(neon_env_builder: NeonEnvBuilder):
)
addr = f"https://localhost:{env.pageserver.service_port.https}/v1/status"
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
new_https_conn_count = (
ps_client.get_metric_value("http_server_connection_started_total", filter_https) or 0
@@ -227,10 +227,27 @@ def test_storage_broker_https_api(neon_env_builder: NeonEnvBuilder):
# 1. Simple check that HTTPS is enabled and works.
url = env.broker.client_url() + "/status"
assert url.startswith("https://")
requests.get(url, verify=str(env.ssl_ca_file)).raise_for_status()
requests.get(url, verify=str(env.tls_ca_file)).raise_for_status()
# 2. Simple workload to check that SK -> broker -> PS communication works over HTTPS.
workload = Workload(env, env.initial_tenant, env.initial_timeline)
workload.init()
workload.write_rows(10)
workload.validate()
def test_compute_tls(
neon_env_builder: NeonEnvBuilder,
):
neon_env_builder.use_compute_tls = True
env = neon_env_builder.init_start()
env.create_branch("test_compute_tls")
with env.endpoints.create_start("test_compute_tls") as endpoint:
res = endpoint.safe_psql(
"select ssl from pg_stat_ssl where pid = pg_backend_pid();",
sslmode="verify-full",
sslrootcert=env.tls_ca_file,
)
assert res == [(True,)]

View File

@@ -2757,37 +2757,18 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
remote_storage_kind = s3_storage()
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
# Set a very small disk usage limit (1KB)
neon_env_builder.safekeeper_extra_opts = ["--max-timeline-disk-usage-bytes=1024"]
env = neon_env_builder.init_start()
# Create a timeline and endpoint
env.create_branch("test_timeline_disk_usage_limit")
endpoint = env.endpoints.create_start(
"test_timeline_disk_usage_limit",
config_lines=[
"neon.lakebase_mode=true",
],
)
# Install the neon extension in the test database. We need it to query perf counter metrics.
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE EXTENSION IF NOT EXISTS neon")
# Sanity-check safekeeper connection status in neon_perf_counters in the happy case.
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 active safekeeper"
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
endpoint = env.endpoints.create_start("test_timeline_disk_usage_limit")
# Get the safekeeper
sk = env.safekeepers[0]
# Restart the safekeeper with a very small disk usage limit (1KB)
sk.stop().start(["--max-timeline-disk-usage-bytes=1024"])
# Inject a failpoint to stop WAL backup
with sk.http_client() as http_cli:
http_cli.configure_failpoints([("backup-lsn-range-pausable", "pause")])
@@ -2813,18 +2794,6 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
wait_until(error_logged)
log.info("Found expected error message in compute log, resuming.")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# Confirm that neon_perf_counters also indicates that there are no active safekeepers
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'"
)
assert cur.fetchone() == (0,), "Expected 0 active safekeepers"
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
# Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we
# implemented didn't work as expected.
time.sleep(2)

Some files were not shown because too many files have changed in this diff Show More