mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 23:50:39 +00:00
Compare commits
12 Commits
amasterov/
...
conrad/fix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cc66f78d01 | ||
|
|
f9e6802974 | ||
|
|
74afc9d96f | ||
|
|
86fe3150f0 | ||
|
|
52be0146d3 | ||
|
|
a3f2a2cae5 | ||
|
|
a24a0032ad | ||
|
|
70cb02742a | ||
|
|
a845295cb3 | ||
|
|
e288cd2198 | ||
|
|
ffa9e595b8 | ||
|
|
e7b1f63f68 |
@@ -1 +1 @@
|
||||
SELECT num_requested AS checkpoints_req FROM pg_catalog.pg_stat_checkpointer;
|
||||
SELECT num_requested AS checkpoints_req FROM pg_stat_checkpointer;
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT checkpoints_req FROM pg_catalog.pg_stat_bgwriter;
|
||||
SELECT checkpoints_req FROM pg_stat_bgwriter;
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT checkpoints_timed FROM pg_catalog.pg_stat_bgwriter;
|
||||
SELECT checkpoints_timed FROM pg_stat_bgwriter;
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT (neon.backpressure_throttling_time()::pg_catalog.float8 / 1000000) AS throttled;
|
||||
SELECT (neon.backpressure_throttling_time()::float8 / 1000000) AS throttled;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
SELECT CASE
|
||||
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_catalog.pg_last_wal_replay_lsn() - '0/0')::pg_catalog.FLOAT8
|
||||
ELSE (pg_catalog.pg_current_wal_lsn() - '0/0')::pg_catalog.FLOAT8
|
||||
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_last_wal_replay_lsn() - '0/0')::FLOAT8
|
||||
ELSE (pg_current_wal_lsn() - '0/0')::FLOAT8
|
||||
END AS lsn;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
SELECT
|
||||
(SELECT setting FROM pg_catalog.pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
|
||||
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
|
||||
-- These temporary snapshot files are renamed to the actual snapshot files
|
||||
-- after they are completely built. We only WAL-log the completely built
|
||||
-- snapshot files
|
||||
(SELECT COUNT(*) FROM pg_catalog.pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
|
||||
(SELECT COUNT(*) FROM pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
SELECT
|
||||
(SELECT pg_catalog.current_setting('neon.timeline_id')) AS timeline_id,
|
||||
(SELECT current_setting('neon.timeline_id')) AS timeline_id,
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
|
||||
-- These temporary snapshot files are renamed to the actual snapshot files
|
||||
-- after they are completely built. We only WAL-log the completely built
|
||||
-- snapshot files
|
||||
(SELECT COALESCE(pg_catalog.sum(size), 0) FROM pg_catalog.pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;
|
||||
(SELECT COALESCE(sum(size), 0) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
SELECT
|
||||
(SELECT setting FROM pg_catalog.pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
|
||||
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
|
||||
-- These temporary snapshot files are renamed to the actual snapshot files
|
||||
-- after they are completely built. We only WAL-log the completely built
|
||||
-- snapshot files
|
||||
(SELECT COALESCE(pg_catalog.sum((pg_catalog.pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
|
||||
FROM (SELECT * FROM pg_catalog.pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
|
||||
(SELECT COALESCE(sum((pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
|
||||
FROM (SELECT * FROM pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
|
||||
) AS logical_snapshots_bytes;
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT pg_catalog.current_setting('max_connections') AS max_connections;
|
||||
SELECT current_setting('max_connections') as max_connections;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
SELECT datname database_name,
|
||||
pg_catalog.age(datfrozenxid) frozen_xid_age
|
||||
FROM pg_catalog.pg_database
|
||||
age(datfrozenxid) frozen_xid_age
|
||||
FROM pg_database
|
||||
ORDER BY frozen_xid_age DESC LIMIT 10;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
SELECT datname database_name,
|
||||
pg_catalog.mxid_age(datminmxid) min_mxid_age
|
||||
FROM pg_catalog.pg_database
|
||||
mxid_age(datminmxid) min_mxid_age
|
||||
FROM pg_database
|
||||
ORDER BY min_mxid_age DESC LIMIT 10;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
SELECT CASE
|
||||
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_catalog.pg_last_wal_receive_lsn() - '0/0')::pg_catalog.FLOAT8
|
||||
WHEN pg_catalog.pg_is_in_recovery() THEN (pg_last_wal_receive_lsn() - '0/0')::FLOAT8
|
||||
ELSE 0
|
||||
END AS lsn;
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT subenabled::pg_catalog.text AS enabled, pg_catalog.count(*) AS subscriptions_count FROM pg_catalog.pg_subscription GROUP BY subenabled;
|
||||
SELECT subenabled::text AS enabled, count(*) AS subscriptions_count FROM pg_subscription GROUP BY subenabled;
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT datname, state, pg_catalog.count(*) AS count FROM pg_catalog.pg_stat_activity WHERE state <> '' GROUP BY datname, state;
|
||||
SELECT datname, state, count(*) AS count FROM pg_stat_activity WHERE state <> '' GROUP BY datname, state;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
SELECT pg_catalog.sum(pg_catalog.pg_database_size(datname)) AS total
|
||||
FROM pg_catalog.pg_database
|
||||
SELECT sum(pg_database_size(datname)) AS total
|
||||
FROM pg_database
|
||||
-- Ignore invalid databases, as we will likely have problems with
|
||||
-- getting their size from the Pageserver.
|
||||
WHERE datconnlimit != -2;
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
-- minutes.
|
||||
|
||||
SELECT
|
||||
x::pg_catalog.text AS duration_seconds,
|
||||
x::text as duration_seconds,
|
||||
neon.approximate_working_set_size_seconds(x) AS size
|
||||
FROM (SELECT generate_series * 60 AS x FROM generate_series(1, 60)) AS t (x);
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
|
||||
SELECT
|
||||
x AS duration,
|
||||
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::pg_catalog.interval)::pg_catalog.int4) AS size FROM (
|
||||
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::interval)::int) AS size FROM (
|
||||
VALUES ('5m'), ('15m'), ('1h')
|
||||
) AS t (x);
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT pg_catalog.pg_size_bytes(pg_catalog.current_setting('neon.file_cache_size_limit')) AS lfc_cache_size_limit;
|
||||
SELECT pg_size_bytes(current_setting('neon.file_cache_size_limit')) AS lfc_cache_size_limit;
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
SELECT slot_name, (restart_lsn - '0/0')::pg_catalog.FLOAT8 AS restart_lsn
|
||||
FROM pg_catalog.pg_replication_slots
|
||||
SELECT slot_name, (restart_lsn - '0/0')::FLOAT8 as restart_lsn
|
||||
FROM pg_replication_slots
|
||||
WHERE slot_type = 'logical';
|
||||
|
||||
@@ -1 +1 @@
|
||||
SELECT setting::pg_catalog.int4 AS max_cluster_size FROM pg_catalog.pg_settings WHERE name = 'neon.max_cluster_size';
|
||||
SELECT setting::int AS max_cluster_size FROM pg_settings WHERE name = 'neon.max_cluster_size';
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
-- We export stats for 10 non-system databases. Without this limit it is too
|
||||
-- easy to abuse the system by creating lots of databases.
|
||||
|
||||
SELECT pg_catalog.pg_database_size(datname) AS db_size,
|
||||
SELECT pg_database_size(datname) AS db_size,
|
||||
deadlocks,
|
||||
tup_inserted AS inserted,
|
||||
tup_updated AS updated,
|
||||
tup_deleted AS deleted,
|
||||
datname
|
||||
FROM pg_catalog.pg_stat_database
|
||||
FROM pg_stat_database
|
||||
WHERE datname IN (
|
||||
SELECT datname FROM pg_database
|
||||
-- Ignore invalid databases, as we will likely have problems with
|
||||
|
||||
@@ -3,4 +3,4 @@
|
||||
-- replay LSN may have advanced past the receive LSN we are using for the
|
||||
-- calculation.
|
||||
|
||||
SELECT GREATEST(0, pg_catalog.pg_wal_lsn_diff(pg_catalog.pg_last_wal_receive_lsn(), pg_catalog.pg_last_wal_replay_lsn())) AS replication_delay_bytes;
|
||||
SELECT GREATEST(0, pg_wal_lsn_diff(pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn())) AS replication_delay_bytes;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
SELECT
|
||||
CASE
|
||||
WHEN pg_catalog.pg_last_wal_receive_lsn() = pg_catalog.pg_last_wal_replay_lsn() THEN 0
|
||||
ELSE GREATEST(0, EXTRACT (EPOCH FROM pg_catalog.now() - pg_catalog.pg_last_xact_replay_timestamp()))
|
||||
WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn() THEN 0
|
||||
ELSE GREATEST(0, EXTRACT (EPOCH FROM now() - pg_last_xact_replay_timestamp()))
|
||||
END AS replication_delay_seconds;
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
SELECT
|
||||
slot_name,
|
||||
pg_catalog.pg_wal_lsn_diff(
|
||||
pg_wal_lsn_diff(
|
||||
CASE
|
||||
WHEN pg_catalog.pg_is_in_recovery() THEN pg_catalog.pg_last_wal_replay_lsn()
|
||||
ELSE pg_catalog.pg_current_wal_lsn()
|
||||
WHEN pg_is_in_recovery() THEN pg_last_wal_replay_lsn()
|
||||
ELSE pg_current_wal_lsn()
|
||||
END,
|
||||
restart_lsn)::pg_catalog.FLOAT8 AS retained_wal
|
||||
FROM pg_catalog.pg_replication_slots
|
||||
restart_lsn)::FLOAT8 AS retained_wal
|
||||
FROM pg_replication_slots
|
||||
WHERE active = false;
|
||||
|
||||
@@ -4,4 +4,4 @@ SELECT
|
||||
WHEN wal_status = 'lost' THEN 1
|
||||
ELSE 0
|
||||
END AS wal_is_lost
|
||||
FROM pg_catalog.pg_replication_slots;
|
||||
FROM pg_replication_slots;
|
||||
|
||||
@@ -57,6 +57,9 @@ stateDiagram-v2
|
||||
RefreshConfigurationPending --> RefreshConfiguration: Received compute spec and started configuration
|
||||
RefreshConfiguration --> Running : Compute has been re-configured
|
||||
RefreshConfiguration --> RefreshConfigurationPending : Configuration failed and to be retried
|
||||
Running --> Reloading : Local changes (TLS certificate renewal) were detected and postgres is being reloaded
|
||||
Reloading --> Running : Postgres was reloaded
|
||||
Reloading --> Failed : Failed to reload postgres
|
||||
TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status
|
||||
TerminationPendingImmediate --> Terminated : Terminated compute immediately
|
||||
Failed --> RefreshConfigurationPending : Received a /refresh_configuration request
|
||||
|
||||
@@ -279,7 +279,7 @@ fn main() -> Result<()> {
|
||||
config,
|
||||
)?;
|
||||
|
||||
let exit_code = compute_node.run().context("running compute node")?;
|
||||
let exit_code = compute_node.run()?;
|
||||
|
||||
scenario.teardown();
|
||||
|
||||
|
||||
@@ -24,9 +24,9 @@ pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
|
||||
});
|
||||
|
||||
let query = "
|
||||
INSERT INTO public.health_check VALUES (1, pg_catalog.now())
|
||||
INSERT INTO health_check VALUES (1, now())
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET updated_at = pg_catalog.now();";
|
||||
SET updated_at = now();";
|
||||
|
||||
match client.simple_query(query).await {
|
||||
Result::Ok(result) => {
|
||||
|
||||
@@ -28,16 +28,12 @@ use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Condvar, Mutex, RwLock};
|
||||
use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{env, fs};
|
||||
use tokio::{spawn, sync::watch, task::JoinHandle, time};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, debug, error, info, instrument, warn};
|
||||
use url::Url;
|
||||
use utils::backoff::{
|
||||
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff_duration,
|
||||
};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::measured_stream::MeasuredReader;
|
||||
@@ -61,7 +57,6 @@ use crate::rsyslog::{
|
||||
use crate::spec::*;
|
||||
use crate::swap::resize_swap;
|
||||
use crate::sync_sk::{check_if_synced, ping_safekeeper};
|
||||
use crate::tls::watch_cert_for_changes;
|
||||
use crate::{config, extension_server, local_proxy};
|
||||
|
||||
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
|
||||
@@ -196,7 +191,6 @@ pub struct ComputeState {
|
||||
pub startup_span: Option<tracing::span::Span>,
|
||||
|
||||
pub lfc_prewarm_state: LfcPrewarmState,
|
||||
pub lfc_prewarm_token: CancellationToken,
|
||||
pub lfc_offload_state: LfcOffloadState,
|
||||
|
||||
/// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if
|
||||
@@ -222,7 +216,6 @@ impl ComputeState {
|
||||
lfc_offload_state: LfcOffloadState::default(),
|
||||
terminate_flush_lsn: None,
|
||||
promote_state: None,
|
||||
lfc_prewarm_token: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -589,7 +582,7 @@ impl ComputeNode {
|
||||
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
|
||||
// Unset them via connection string options before connecting to the database.
|
||||
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
|
||||
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path='' -c statement_timeout=0 -c pgaudit.log=none";
|
||||
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0 -c pgaudit.log=none";
|
||||
let options = match conn_conf.get_options() {
|
||||
// Allow the control plane to override any options set by the
|
||||
// compute
|
||||
@@ -848,14 +841,11 @@ impl ComputeNode {
|
||||
let mut pre_tasks = tokio::task::JoinSet::new();
|
||||
|
||||
// Make sure TLS certificates are properly loaded and in the right place.
|
||||
if self.compute_ctl_config.tls.is_some() {
|
||||
let tls_task = self.compute_ctl_config.tls.as_ref().map(|tls_config| {
|
||||
let this = self.clone();
|
||||
pre_tasks.spawn(async move {
|
||||
this.watch_cert_for_changes().await;
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
});
|
||||
}
|
||||
let tls_config = tls_config.clone();
|
||||
tokio::task::spawn_blocking(|| this.watch_cert_for_changes(tls_config))
|
||||
});
|
||||
|
||||
let tls_config = self.tls_config(&pspec.spec);
|
||||
|
||||
@@ -910,6 +900,13 @@ impl ComputeNode {
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for TLS certificates to be issued before updating pgbouncer and local proxy.
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
if let Some(tls_task) = tls_task {
|
||||
rt.block_on(tls_task)
|
||||
.context("TLS certificate renewal task panicked")?;
|
||||
}
|
||||
|
||||
// tune pgbouncer
|
||||
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
|
||||
info!("tuning pgbouncer");
|
||||
@@ -992,7 +989,6 @@ impl ComputeNode {
|
||||
let _configurator_handle = launch_configurator(self);
|
||||
|
||||
// Wait for all the pre-tasks to finish before starting postgres
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
while let Some(res) = rt.block_on(pre_tasks.join_next()) {
|
||||
res??;
|
||||
}
|
||||
@@ -1560,41 +1556,6 @@ impl ComputeNode {
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
fn sync_safekeepers_with_retries(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
|
||||
let max_retries = 5;
|
||||
let mut attempts = 0;
|
||||
loop {
|
||||
let result = self.sync_safekeepers(storage_auth_token.clone());
|
||||
match &result {
|
||||
Ok(_) => {
|
||||
if attempts > 0 {
|
||||
tracing::info!("sync_safekeepers succeeded after {attempts} retries");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
Err(e) if attempts < max_retries => {
|
||||
tracing::info!(
|
||||
"sync_safekeepers failed, will retry (attempt {attempts}): {e:#}"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"sync_safekeepers still failed after {attempts} retries, giving up: {err:?}"
|
||||
);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
// sleep and retry
|
||||
let backoff = exponential_backoff_duration(
|
||||
attempts,
|
||||
DEFAULT_BASE_BACKOFF_SECONDS,
|
||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
);
|
||||
std::thread::sleep(backoff);
|
||||
attempts += 1;
|
||||
}
|
||||
}
|
||||
|
||||
/// Do all the preparations like PGDATA directory creation, configuration,
|
||||
/// safekeepers sync, basebackup, etc.
|
||||
#[instrument(skip_all)]
|
||||
@@ -1630,7 +1591,7 @@ impl ComputeNode {
|
||||
lsn
|
||||
} else {
|
||||
info!("starting safekeepers syncing");
|
||||
self.sync_safekeepers_with_retries(pspec.storage_auth_token.clone())
|
||||
self.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
.with_context(|| "failed to sync safekeepers")?
|
||||
};
|
||||
info!("safekeepers synced at LSN {}", lsn);
|
||||
@@ -1925,7 +1886,7 @@ impl ComputeNode {
|
||||
|
||||
// It doesn't matter what were the options before, here we just want
|
||||
// to connect and create a new superuser role.
|
||||
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path='' -c statement_timeout=0";
|
||||
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
|
||||
zenith_admin_conf.options(ZENITH_OPTIONS);
|
||||
|
||||
let mut client =
|
||||
@@ -1990,10 +1951,7 @@ impl ComputeNode {
|
||||
.clone(),
|
||||
);
|
||||
|
||||
let mut tls_config = None::<TlsConfig>;
|
||||
if spec.features.contains(&ComputeFeature::TlsExperimental) {
|
||||
tls_config = self.compute_ctl_config.tls.clone();
|
||||
}
|
||||
let tls_config = self.tls_config(&spec);
|
||||
|
||||
self.update_installed_extensions_collection_interval(&spec);
|
||||
|
||||
@@ -2175,6 +2133,60 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Tell postgres/pgbouncer/local_proxy to reload their configurations.
|
||||
#[instrument(skip_all)]
|
||||
pub fn reload(&self, spec: ComputeSpec) -> Result<()> {
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
if spec.pgbouncer_settings.is_some() {
|
||||
rt.block_on(reload_pgbouncer())?;
|
||||
}
|
||||
if spec.local_proxy_config.is_some() {
|
||||
local_proxy::reload()?;
|
||||
}
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
let unknown_op = "unknown".to_string();
|
||||
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
|
||||
info!("finished reload of compute node for operation {op_id}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Acquire the "reloading" lock while running the supplied function.
|
||||
///
|
||||
/// This ensures that this thread is the only thread that
|
||||
/// can issue signals to postgres.
|
||||
///
|
||||
/// If the supplied function errors, the compute status is marked as failed.
|
||||
pub fn lock_while_reloading<T>(
|
||||
&self,
|
||||
mut state: MutexGuard<'_, ComputeState>,
|
||||
f: impl FnOnce(ComputeSpec) -> Result<T>,
|
||||
) -> Result<T> {
|
||||
let old_status = state.status;
|
||||
|
||||
// transition to the reloading state.
|
||||
state.set_status(ComputeStatus::Reloading, &self.state_changed);
|
||||
let spec = state.pspec.as_ref().unwrap().spec.clone();
|
||||
// unlock while reloading, so we don't block other tasks.
|
||||
drop(state);
|
||||
|
||||
let res = f(spec);
|
||||
|
||||
let new_status = if res.is_ok() {
|
||||
old_status
|
||||
} else {
|
||||
ComputeStatus::Failed
|
||||
};
|
||||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
// make sure our invariants are upheld
|
||||
assert_eq!(state.status, ComputeStatus::Reloading);
|
||||
state.set_status(new_status, &self.state_changed);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn configure_as_primary(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
@@ -2209,57 +2221,103 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn watch_cert_for_changes(self: Arc<Self>) {
|
||||
// update status on cert renewal
|
||||
if let Some(tls_config) = &self.compute_ctl_config.tls {
|
||||
let tls_config = tls_config.clone();
|
||||
pub fn watch_cert_for_changes(self: Arc<Self>, tls_config: TlsConfig) {
|
||||
// wait until the cert exists.
|
||||
let mut digest = crate::tls::compute_digest(&tls_config.cert_path);
|
||||
info!(
|
||||
cert_path = tls_config.cert_path,
|
||||
key_path = tls_config.key_path,
|
||||
"TLS certificates found"
|
||||
);
|
||||
|
||||
// wait until the cert exists.
|
||||
let mut cert_watch = watch_cert_for_changes(tls_config.cert_path.clone()).await;
|
||||
// ensure the keys are saved before continuing.
|
||||
let key_pair = crate::tls::load_certs_blocking(&tls_config);
|
||||
while let Err(e) =
|
||||
crate::tls::update_key_path_blocking(Path::new(&self.params.pgdata), &key_pair)
|
||||
{
|
||||
error!("could not save TLS certificates: {e}");
|
||||
std::thread::sleep(Duration::from_millis(20));
|
||||
}
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
'cert_update: loop {
|
||||
// let postgres/pgbouncer/local_proxy know the new cert/key exists.
|
||||
// we need to wait until it's configurable first.
|
||||
tokio::task::spawn_blocking(move || {
|
||||
'cert_update: loop {
|
||||
// wait for a new certificate update
|
||||
let new_digest = crate::tls::wait_until_cert_changed(digest, &tls_config.cert_path);
|
||||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
'status_update: loop {
|
||||
match state.status {
|
||||
// let's update the state to config pending
|
||||
ComputeStatus::ConfigurationPending | ComputeStatus::Running => {
|
||||
state.set_status(
|
||||
ComputeStatus::ConfigurationPending,
|
||||
&self.state_changed,
|
||||
);
|
||||
break 'status_update;
|
||||
}
|
||||
// load the corresponding keys
|
||||
let key_pair = crate::tls::load_certs_blocking(&tls_config);
|
||||
|
||||
// exit loop
|
||||
ComputeStatus::Failed
|
||||
| ComputeStatus::TerminationPendingFast
|
||||
| ComputeStatus::TerminationPendingImmediate
|
||||
| ComputeStatus::Terminated => break 'cert_update,
|
||||
// let postgres/pgbouncer/local_proxy know the new cert/key exists.
|
||||
// we need to wait until it's configurable first.
|
||||
|
||||
// wait
|
||||
ComputeStatus::Init
|
||||
| ComputeStatus::Configuration
|
||||
| ComputeStatus::RefreshConfiguration
|
||||
| ComputeStatus::RefreshConfigurationPending
|
||||
| ComputeStatus::Empty => {
|
||||
state = self.state_changed.wait(state).unwrap();
|
||||
}
|
||||
let mut state = self.state.lock().unwrap();
|
||||
'status_update: loop {
|
||||
match state.status {
|
||||
// let's update the state to config pending
|
||||
ComputeStatus::Running => {
|
||||
info!("reloading compute due to TLS certificate renewal");
|
||||
break 'status_update;
|
||||
}
|
||||
|
||||
// exit loop
|
||||
ComputeStatus::Failed
|
||||
| ComputeStatus::TerminationPendingFast
|
||||
| ComputeStatus::TerminationPendingImmediate
|
||||
| ComputeStatus::Terminated => break 'cert_update,
|
||||
|
||||
// wait
|
||||
ComputeStatus::Init
|
||||
| ComputeStatus::Configuration
|
||||
| ComputeStatus::ConfigurationPending
|
||||
| ComputeStatus::RefreshConfiguration
|
||||
| ComputeStatus::RefreshConfigurationPending
|
||||
| ComputeStatus::Reloading
|
||||
| ComputeStatus::Empty => {
|
||||
state = self.state_changed.wait(state).unwrap();
|
||||
}
|
||||
}
|
||||
drop(state);
|
||||
}
|
||||
|
||||
// wait for a new certificate update
|
||||
if handle.block_on(cert_watch.changed()).is_err() {
|
||||
break;
|
||||
let result = self.lock_while_reloading(state, |spec| {
|
||||
// ensure the keys are saved before continuing.
|
||||
// we do this while holding the 'reloading' state so that we know we're not interfering with any
|
||||
// active configuration stages.
|
||||
if let Err(e) = crate::tls::update_key_path_blocking(
|
||||
Path::new(&self.params.pgdata),
|
||||
&key_pair,
|
||||
) {
|
||||
return Ok(Err(e));
|
||||
}
|
||||
|
||||
// reload postgres/pgbouncer/local_proxy to pick up our new certificates.
|
||||
self.reload(spec)?;
|
||||
|
||||
Ok(Ok(()))
|
||||
});
|
||||
|
||||
match result {
|
||||
// Reload failed. Compute is in a bad state.
|
||||
Err(e) => {
|
||||
error!("could not reload compute node: {}", e);
|
||||
return;
|
||||
}
|
||||
// Updating the certificates failed. Retry
|
||||
Ok(Err(e)) => {
|
||||
error!("could not save TLS certificates: {e}");
|
||||
std::thread::sleep(Duration::from_millis(20));
|
||||
}
|
||||
// Successful. Acknowledge that we've saved these certificates.
|
||||
Ok(Ok(())) => {
|
||||
digest = new_digest;
|
||||
info!(
|
||||
cert_path = tls_config.cert_path,
|
||||
key_path = tls_config.key_path,
|
||||
"TLS certificates renewed",
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn tls_config(&self, spec: &ComputeSpec) -> &Option<TlsConfig> {
|
||||
@@ -2380,13 +2438,13 @@ impl ComputeNode {
|
||||
let result = client
|
||||
.simple_query(
|
||||
"SELECT
|
||||
pg_catalog.row_to_json(pss)
|
||||
row_to_json(pg_stat_statements)
|
||||
FROM
|
||||
public.pg_stat_statements pss
|
||||
pg_stat_statements
|
||||
WHERE
|
||||
pss.userid != 'cloud_admin'::pg_catalog.regrole::pg_catalog.oid
|
||||
userid != 'cloud_admin'::regrole::oid
|
||||
ORDER BY
|
||||
(pss.mean_exec_time + pss.mean_plan_time) DESC
|
||||
(mean_exec_time + mean_plan_time) DESC
|
||||
LIMIT 100",
|
||||
)
|
||||
.await;
|
||||
@@ -2514,11 +2572,11 @@ LIMIT 100",
|
||||
|
||||
// check the role grants first - to gracefully handle read-replicas.
|
||||
let select = "SELECT privilege_type
|
||||
FROM pg_catalog.pg_namespace
|
||||
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) AS acl ON true
|
||||
JOIN pg_catalog.pg_user users ON acl.grantee = users.usesysid
|
||||
WHERE users.usename OPERATOR(pg_catalog.=) $1::pg_catalog.name
|
||||
AND nspname OPERATOR(pg_catalog.=) $2::pg_catalog.name";
|
||||
FROM pg_namespace
|
||||
JOIN LATERAL (SELECT * FROM aclexplode(nspacl) AS x) acl ON true
|
||||
JOIN pg_user users ON acl.grantee = users.usesysid
|
||||
WHERE users.usename = $1
|
||||
AND nspname = $2";
|
||||
let rows = db_client
|
||||
.query(select, &[role_name, schema_name])
|
||||
.await
|
||||
@@ -2587,9 +2645,8 @@ LIMIT 100",
|
||||
.await
|
||||
.with_context(|| format!("Failed to execute query: {query}"))?;
|
||||
} else {
|
||||
let query = format!(
|
||||
"CREATE EXTENSION IF NOT EXISTS {ext_name} WITH SCHEMA public VERSION {quoted_version}"
|
||||
);
|
||||
let query =
|
||||
format!("CREATE EXTENSION IF NOT EXISTS {ext_name} WITH VERSION {quoted_version}");
|
||||
db_client
|
||||
.simple_query(&query)
|
||||
.await
|
||||
|
||||
@@ -7,8 +7,7 @@ use http::StatusCode;
|
||||
use reqwest::Client;
|
||||
use std::mem::replace;
|
||||
use std::sync::Arc;
|
||||
use tokio::{io::AsyncReadExt, select, spawn};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tokio::{io::AsyncReadExt, spawn};
|
||||
use tracing::{error, info};
|
||||
|
||||
#[derive(serde::Serialize, Default)]
|
||||
@@ -93,35 +92,34 @@ impl ComputeNode {
|
||||
/// If there is a prewarm request ongoing, return `false`, `true` otherwise.
|
||||
/// Has a failpoint "compute-prewarm"
|
||||
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
|
||||
let token: CancellationToken;
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap();
|
||||
token = state.lfc_prewarm_token.clone();
|
||||
if let LfcPrewarmState::Prewarming =
|
||||
replace(&mut state.lfc_prewarm_state, LfcPrewarmState::Prewarming)
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
|
||||
if let LfcPrewarmState::Prewarming = replace(state, LfcPrewarmState::Prewarming) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
crate::metrics::LFC_PREWARMS.inc();
|
||||
|
||||
let this = self.clone();
|
||||
let cloned = self.clone();
|
||||
spawn(async move {
|
||||
let prewarm_state = match this.prewarm_impl(from_endpoint, token).await {
|
||||
Ok(state) => state,
|
||||
let state = match cloned.prewarm_impl(from_endpoint).await {
|
||||
Ok(true) => LfcPrewarmState::Completed,
|
||||
Ok(false) => {
|
||||
info!(
|
||||
"skipping LFC prewarm because LFC state is not found in endpoint storage"
|
||||
);
|
||||
LfcPrewarmState::Skipped
|
||||
}
|
||||
Err(err) => {
|
||||
crate::metrics::LFC_PREWARM_ERRORS.inc();
|
||||
error!(%err, "could not prewarm LFC");
|
||||
let error = format!("{err:#}");
|
||||
LfcPrewarmState::Failed { error }
|
||||
LfcPrewarmState::Failed {
|
||||
error: format!("{err:#}"),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let state = &mut this.state.lock().unwrap();
|
||||
if let LfcPrewarmState::Cancelled = prewarm_state {
|
||||
state.lfc_prewarm_token = CancellationToken::new();
|
||||
}
|
||||
state.lfc_prewarm_state = prewarm_state;
|
||||
cloned.state.lock().unwrap().lfc_prewarm_state = state;
|
||||
});
|
||||
true
|
||||
}
|
||||
@@ -134,70 +132,47 @@ impl ComputeNode {
|
||||
|
||||
/// Request LFC state from endpoint storage and load corresponding pages into Postgres.
|
||||
/// Returns a result with `false` if the LFC state is not found in endpoint storage.
|
||||
async fn prewarm_impl(
|
||||
&self,
|
||||
from_endpoint: Option<String>,
|
||||
token: CancellationToken,
|
||||
) -> Result<LfcPrewarmState> {
|
||||
let EndpointStoragePair {
|
||||
url,
|
||||
token: storage_token,
|
||||
} = self.endpoint_storage_pair(from_endpoint)?;
|
||||
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<bool> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
|
||||
fail::fail_point!("compute-prewarm", |_| {
|
||||
bail!("prewarm configured to fail because of a failpoint")
|
||||
});
|
||||
|
||||
info!(%url, "requesting LFC state from endpoint storage");
|
||||
let request = Client::new().get(&url).bearer_auth(storage_token);
|
||||
let response = select! {
|
||||
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
|
||||
response = request.send() => response
|
||||
}
|
||||
.context("querying endpoint storage")?;
|
||||
|
||||
match response.status() {
|
||||
let request = Client::new().get(&url).bearer_auth(token);
|
||||
let res = request.send().await.context("querying endpoint storage")?;
|
||||
match res.status() {
|
||||
StatusCode::OK => (),
|
||||
StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
|
||||
StatusCode::NOT_FOUND => {
|
||||
return Ok(false);
|
||||
}
|
||||
status => bail!("{status} querying endpoint storage"),
|
||||
}
|
||||
|
||||
let mut uncompressed = Vec::new();
|
||||
let lfc_state = select! {
|
||||
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
|
||||
lfc_state = response.bytes() => lfc_state
|
||||
}
|
||||
.context("getting request body from endpoint storage")?;
|
||||
|
||||
let mut decoder = ZstdDecoder::new(lfc_state.iter().as_slice());
|
||||
select! {
|
||||
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
|
||||
read = decoder.read_to_end(&mut uncompressed) => read
|
||||
}
|
||||
.context("decoding LFC state")?;
|
||||
|
||||
let uncompressed_len = uncompressed.len();
|
||||
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
|
||||
|
||||
// Client connection and prewarm info querying are fast and therefore don't need
|
||||
// cancellation
|
||||
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
let lfc_state = res
|
||||
.bytes()
|
||||
.await
|
||||
.context("connecting to postgres")?;
|
||||
let pg_token = client.cancel_token();
|
||||
.context("getting request body from endpoint storage")?;
|
||||
ZstdDecoder::new(lfc_state.iter().as_slice())
|
||||
.read_to_end(&mut uncompressed)
|
||||
.await
|
||||
.context("decoding LFC state")?;
|
||||
let uncompressed_len = uncompressed.len();
|
||||
|
||||
let params: Vec<&(dyn postgres_types::ToSql + Sync)> = vec![&uncompressed];
|
||||
select! {
|
||||
res = client.query_one("select neon.prewarm_local_cache($1)", ¶ms) => res,
|
||||
_ = token.cancelled() => {
|
||||
pg_token.cancel_query(postgres::NoTls).await
|
||||
.context("cancelling neon.prewarm_local_cache()")?;
|
||||
return Ok(LfcPrewarmState::Cancelled)
|
||||
}
|
||||
}
|
||||
.context("loading LFC state into postgres")
|
||||
.map(|_| ())?;
|
||||
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into Postgres");
|
||||
|
||||
Ok(LfcPrewarmState::Completed)
|
||||
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
.await
|
||||
.context("connecting to postgres")?
|
||||
.query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
|
||||
.await
|
||||
.context("loading LFC state into postgres")
|
||||
.map(|_| ())?;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// If offload request is ongoing, return false, true otherwise
|
||||
@@ -225,20 +200,20 @@ impl ComputeNode {
|
||||
|
||||
async fn offload_lfc_with_state_update(&self) {
|
||||
crate::metrics::LFC_OFFLOADS.inc();
|
||||
let state = match self.offload_lfc_impl().await {
|
||||
Ok(state) => state,
|
||||
Err(err) => {
|
||||
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
|
||||
error!(%err, "could not offload LFC");
|
||||
let error = format!("{err:#}");
|
||||
LfcOffloadState::Failed { error }
|
||||
}
|
||||
|
||||
let Err(err) = self.offload_lfc_impl().await else {
|
||||
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
|
||||
return;
|
||||
};
|
||||
|
||||
self.state.lock().unwrap().lfc_offload_state = state;
|
||||
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
|
||||
error!(%err, "could not offload LFC state to endpoint storage");
|
||||
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
|
||||
error: format!("{err:#}"),
|
||||
};
|
||||
}
|
||||
|
||||
async fn offload_lfc_impl(&self) -> Result<LfcOffloadState> {
|
||||
async fn offload_lfc_impl(&self) -> Result<()> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
|
||||
info!(%url, "requesting LFC state from Postgres");
|
||||
|
||||
@@ -253,7 +228,7 @@ impl ComputeNode {
|
||||
.context("deserializing LFC state")?;
|
||||
let Some(state) = state else {
|
||||
info!(%url, "empty LFC state, not exporting");
|
||||
return Ok(LfcOffloadState::Skipped);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let mut compressed = Vec::new();
|
||||
@@ -267,7 +242,7 @@ impl ComputeNode {
|
||||
|
||||
let request = Client::new().put(url).bearer_auth(token).body(compressed);
|
||||
match request.send().await {
|
||||
Ok(res) if res.status() == StatusCode::OK => Ok(LfcOffloadState::Completed),
|
||||
Ok(res) if res.status() == StatusCode::OK => Ok(()),
|
||||
Ok(res) => bail!(
|
||||
"Request to endpoint storage failed with status: {}",
|
||||
res.status()
|
||||
@@ -275,8 +250,4 @@ impl ComputeNode {
|
||||
Err(err) => Err(err).context("writing to endpoint storage"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cancel_prewarm(self: &Arc<Self>) {
|
||||
self.state.lock().unwrap().lfc_prewarm_token.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,7 +78,7 @@ impl ComputeNode {
|
||||
const RETRIES: i32 = 20;
|
||||
for i in 0..=RETRIES {
|
||||
let row = client
|
||||
.query_one("SELECT pg_catalog.pg_last_wal_replay_lsn()", &[])
|
||||
.query_one("SELECT pg_last_wal_replay_lsn()", &[])
|
||||
.await
|
||||
.context("getting last replay lsn")?;
|
||||
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
|
||||
@@ -103,7 +103,7 @@ impl ComputeNode {
|
||||
.await
|
||||
.context("setting safekeepers")?;
|
||||
client
|
||||
.query("SELECT pg_catalog.pg_reload_conf()", &[])
|
||||
.query("SELECT pg_reload_conf()", &[])
|
||||
.await
|
||||
.context("reloading postgres config")?;
|
||||
|
||||
@@ -113,7 +113,7 @@ impl ComputeNode {
|
||||
});
|
||||
|
||||
let row = client
|
||||
.query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
|
||||
.query_one("SELECT * FROM pg_promote()", &[])
|
||||
.await
|
||||
.context("pg_promote")?;
|
||||
if !row.get::<usize, bool>(0) {
|
||||
|
||||
@@ -16,7 +16,7 @@ use crate::pg_helpers::{
|
||||
DatabricksSettingsExt as _, GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize,
|
||||
escape_conf_value,
|
||||
};
|
||||
use crate::tls::{self, SERVER_CRT, SERVER_KEY};
|
||||
use crate::tls::{SERVER_CRT, SERVER_KEY};
|
||||
|
||||
use utils::shard::{ShardIndex, ShardNumber};
|
||||
|
||||
@@ -178,14 +178,9 @@ pub fn write_postgres_conf(
|
||||
}
|
||||
|
||||
// tls
|
||||
if let Some(tls_config) = tls_config {
|
||||
if tls_config.is_some() {
|
||||
writeln!(file, "ssl = on")?;
|
||||
|
||||
// postgres requires the keyfile to be in a secure file,
|
||||
// currently too complicated to ensure that at the VM level,
|
||||
// so we just copy them to another file instead. :shrug:
|
||||
tls::update_key_path_blocking(pgdata_path, tls_config);
|
||||
|
||||
// these are the default, but good to be explicit.
|
||||
writeln!(file, "ssl_cert_file = '{SERVER_CRT}'")?;
|
||||
writeln!(file, "ssl_key_file = '{SERVER_KEY}'")?;
|
||||
|
||||
@@ -139,15 +139,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/LfcPrewarmState"
|
||||
delete:
|
||||
tags:
|
||||
- Prewarm
|
||||
summary: Cancel ongoing LFC prewarm
|
||||
description: ""
|
||||
operationId: cancelLfcPrewarm
|
||||
responses:
|
||||
202:
|
||||
description: Prewarm cancelled
|
||||
|
||||
/lfc/offload:
|
||||
post:
|
||||
@@ -645,7 +636,7 @@ components:
|
||||
properties:
|
||||
status:
|
||||
description: LFC offload status
|
||||
enum: [not_offloaded, offloading, completed, skipped, failed]
|
||||
enum: [not_offloaded, offloading, completed, failed]
|
||||
type: string
|
||||
error:
|
||||
description: LFC offload error, if any
|
||||
|
||||
@@ -12,8 +12,10 @@ use crate::http::JsonResponse;
|
||||
/// Check that the compute is currently running.
|
||||
pub(in crate::http) async fn is_writable(State(compute): State<Arc<ComputeNode>>) -> Response {
|
||||
let status = compute.get_status();
|
||||
if status != ComputeStatus::Running {
|
||||
return JsonResponse::invalid_status(status);
|
||||
match status {
|
||||
// If we are running, or just reloading the config, we are ok to write a new config.
|
||||
ComputeStatus::Running | ComputeStatus::Reloading => {}
|
||||
_ => return JsonResponse::invalid_status(status),
|
||||
}
|
||||
|
||||
match check_writability(&compute).await {
|
||||
|
||||
@@ -27,32 +27,6 @@ pub(in crate::http) async fn configure(
|
||||
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
|
||||
};
|
||||
|
||||
// XXX: wrap state update under lock in a code block. Otherwise, we will try
|
||||
// to `Send` `mut state` into the spawned thread bellow, which will cause
|
||||
// the following rustc error:
|
||||
//
|
||||
// error: future cannot be sent between threads safely
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
|
||||
return JsonResponse::invalid_status(state.status);
|
||||
}
|
||||
|
||||
// Pass the tracing span to the main thread that performs the startup,
|
||||
// so that the start_compute operation is considered a child of this
|
||||
// configure request for tracing purposes.
|
||||
state.startup_span = Some(tracing::Span::current());
|
||||
|
||||
if compute.params.lakebase_mode {
|
||||
ComputeNode::set_spec(&compute.params, &mut state, pspec);
|
||||
} else {
|
||||
state.pspec = Some(pspec);
|
||||
}
|
||||
|
||||
state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
|
||||
drop(state);
|
||||
}
|
||||
|
||||
// Spawn a blocking thread to wait for compute to become Running. This is
|
||||
// needed to not block the main pool of workers and to be able to serve
|
||||
// other requests while some particular request is waiting for compute to
|
||||
@@ -60,6 +34,32 @@ pub(in crate::http) async fn configure(
|
||||
let c = compute.clone();
|
||||
let completed = task::spawn_blocking(move || {
|
||||
let mut state = c.state.lock().unwrap();
|
||||
loop {
|
||||
match state.status {
|
||||
// ideal state.
|
||||
ComputeStatus::Empty | ComputeStatus::Running => break,
|
||||
// we need to wait until reloaded
|
||||
ComputeStatus::Reloading => {
|
||||
state = c.state_changed.wait(state).unwrap();
|
||||
}
|
||||
// All other cases are unexpected.
|
||||
_ => return Err(JsonResponse::invalid_status(state.status)),
|
||||
}
|
||||
}
|
||||
|
||||
// Pass the tracing span to the main thread that performs the startup,
|
||||
// so that the start_compute operation is considered a child of this
|
||||
// configure request for tracing purposes.
|
||||
state.startup_span = Some(tracing::Span::current());
|
||||
|
||||
if c.params.lakebase_mode {
|
||||
ComputeNode::set_spec(&c.params, &mut state, pspec);
|
||||
} else {
|
||||
state.pspec = Some(pspec);
|
||||
}
|
||||
|
||||
state.set_status(ComputeStatus::ConfigurationPending, &c.state_changed);
|
||||
|
||||
while state.status != ComputeStatus::Running {
|
||||
state = c.state_changed.wait(state).unwrap();
|
||||
info!(
|
||||
@@ -71,7 +71,7 @@ pub(in crate::http) async fn configure(
|
||||
if state.status == ComputeStatus::Failed {
|
||||
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||
let msg = format!("compute configuration failed: {err:?}");
|
||||
return Err(msg);
|
||||
return Err(JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, msg));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ pub(in crate::http) async fn configure(
|
||||
.unwrap();
|
||||
|
||||
if let Err(e) = completed {
|
||||
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
|
||||
return e;
|
||||
}
|
||||
|
||||
// Return current compute state if everything went well.
|
||||
|
||||
@@ -46,8 +46,3 @@ pub(in crate::http) async fn offload(compute: Compute) -> Response {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(in crate::http) async fn cancel_prewarm(compute: Compute) -> StatusCode {
|
||||
compute.cancel_prewarm();
|
||||
StatusCode::ACCEPTED
|
||||
}
|
||||
|
||||
@@ -99,12 +99,7 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
|
||||
);
|
||||
|
||||
let authenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route(
|
||||
"/lfc/prewarm",
|
||||
get(lfc::prewarm_state)
|
||||
.post(lfc::prewarm)
|
||||
.delete(lfc::cancel_prewarm),
|
||||
)
|
||||
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
|
||||
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
|
||||
.route("/promote", post(promote::promote))
|
||||
.route("/check_writability", post(check_writability::is_writable))
|
||||
|
||||
@@ -19,7 +19,7 @@ async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
|
||||
.query(
|
||||
"SELECT datname FROM pg_catalog.pg_database
|
||||
WHERE datallowconn
|
||||
AND datconnlimit OPERATOR(pg_catalog.<>) (OPERATOR(pg_catalog.-) 2::pg_catalog.int4)
|
||||
AND datconnlimit <> - 2
|
||||
LIMIT 500",
|
||||
&[],
|
||||
)
|
||||
@@ -67,7 +67,7 @@ pub async fn get_installed_extensions(
|
||||
|
||||
let extensions: Vec<(String, String, i32)> = client
|
||||
.query(
|
||||
"SELECT extname, extversion, extowner::pg_catalog.int4 FROM pg_catalog.pg_extension",
|
||||
"SELECT extname, extversion, extowner::integer FROM pg_catalog.pg_extension",
|
||||
&[],
|
||||
)
|
||||
.await?
|
||||
|
||||
@@ -11,9 +11,11 @@ use utils::pid_file::{self, PidFileRead};
|
||||
|
||||
pub fn configure(local_proxy: &LocalProxySpec) -> Result<()> {
|
||||
write_local_proxy_conf("/etc/local_proxy/config.json".as_ref(), local_proxy)?;
|
||||
notify_local_proxy("/etc/local_proxy/pid".as_ref())?;
|
||||
reload()
|
||||
}
|
||||
|
||||
Ok(())
|
||||
pub fn reload() -> Result<()> {
|
||||
notify_local_proxy("/etc/local_proxy/pid".as_ref())
|
||||
}
|
||||
|
||||
/// Create or completely rewrite configuration file specified by `path`
|
||||
|
||||
@@ -76,7 +76,7 @@ impl<'m> MigrationRunner<'m> {
|
||||
self.client
|
||||
.simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
|
||||
.await?;
|
||||
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key pg_catalog.int4 NOT NULL PRIMARY KEY, id pg_catalog.int8 NOT NULL DEFAULT 0)").await?;
|
||||
self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
|
||||
self.client
|
||||
.simple_query(
|
||||
"INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
|
||||
|
||||
@@ -15,17 +15,17 @@ DO $$
|
||||
DECLARE
|
||||
role_name text;
|
||||
BEGIN
|
||||
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles WHERE pg_catalog.pg_has_role(rolname, '{privileged_role_name}', 'member')
|
||||
FOR role_name IN SELECT rolname FROM pg_roles WHERE pg_has_role(rolname, '{privileged_role_name}', 'member')
|
||||
LOOP
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', pg_catalog.quote_ident(role_name);
|
||||
EXECUTE pg_catalog.format('ALTER ROLE %I INHERIT;', role_name);
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % INHERIT', quote_ident(role_name);
|
||||
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' INHERIT';
|
||||
END LOOP;
|
||||
|
||||
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles
|
||||
FOR role_name IN SELECT rolname FROM pg_roles
|
||||
WHERE
|
||||
NOT pg_catalog.pg_has_role(rolname, '{privileged_role_name}', 'member') AND NOT pg_catalog.starts_with(rolname, 'pg_')
|
||||
NOT pg_has_role(rolname, '{privileged_role_name}', 'member') AND NOT starts_with(rolname, 'pg_')
|
||||
LOOP
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', pg_catalog.quote_ident(role_name);
|
||||
EXECUTE pg_catalog.format('ALTER ROLE %I NOBYPASSRLS;', role_name);
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % NOBYPASSRLS', quote_ident(role_name);
|
||||
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOBYPASSRLS';
|
||||
END LOOP;
|
||||
END $$;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
DO $$
|
||||
BEGIN
|
||||
IF (SELECT setting::pg_catalog.numeric >= 160000 FROM pg_catalog.pg_settings WHERE name = 'server_version_num') THEN
|
||||
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
|
||||
EXECUTE 'GRANT pg_create_subscription TO {privileged_role_name}';
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
@@ -5,9 +5,9 @@ DO $$
|
||||
DECLARE
|
||||
role_name TEXT;
|
||||
BEGIN
|
||||
FOR role_name IN SELECT rolname FROM pg_catalog.pg_roles WHERE rolreplication IS TRUE
|
||||
FOR role_name IN SELECT rolname FROM pg_roles WHERE rolreplication IS TRUE
|
||||
LOOP
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', pg_catalog.quote_ident(role_name);
|
||||
EXECUTE pg_catalog.format('ALTER ROLE %I NOREPLICATION;', role_name);
|
||||
RAISE NOTICE 'EXECUTING ALTER ROLE % NOREPLICATION', quote_ident(role_name);
|
||||
EXECUTE 'ALTER ROLE ' || quote_ident(role_name) || ' NOREPLICATION';
|
||||
END LOOP;
|
||||
END $$;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
DO $$
|
||||
BEGIN
|
||||
IF (SELECT setting::pg_catalog.numeric >= 160000 FROM pg_catalog.pg_settings WHERE name OPERATOR(pg_catalog.=) 'server_version_num'::pg_catalog.text) THEN
|
||||
IF (SELECT setting::numeric >= 160000 FROM pg_settings WHERE name = 'server_version_num') THEN
|
||||
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_export_snapshot TO {privileged_role_name}';
|
||||
EXECUTE 'GRANT EXECUTE ON FUNCTION pg_log_standby_snapshot TO {privileged_role_name}';
|
||||
END IF;
|
||||
|
||||
@@ -2,7 +2,7 @@ DO $$
|
||||
DECLARE
|
||||
bypassrls boolean;
|
||||
BEGIN
|
||||
SELECT rolbypassrls INTO bypassrls FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser';
|
||||
SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
|
||||
IF NOT bypassrls THEN
|
||||
RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
|
||||
END IF;
|
||||
|
||||
@@ -4,8 +4,8 @@ DECLARE
|
||||
BEGIN
|
||||
FOR role IN
|
||||
SELECT rolname AS name, rolinherit AS inherit
|
||||
FROM pg_catalog.pg_roles
|
||||
WHERE pg_catalog.pg_has_role(rolname, 'neon_superuser', 'member')
|
||||
FROM pg_roles
|
||||
WHERE pg_has_role(rolname, 'neon_superuser', 'member')
|
||||
LOOP
|
||||
IF NOT role.inherit THEN
|
||||
RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
|
||||
@@ -14,12 +14,12 @@ BEGIN
|
||||
|
||||
FOR role IN
|
||||
SELECT rolname AS name, rolbypassrls AS bypassrls
|
||||
FROM pg_catalog.pg_roles
|
||||
WHERE NOT pg_catalog.pg_has_role(rolname, 'neon_superuser', 'member')
|
||||
AND NOT pg_catalog.starts_with(rolname, 'pg_')
|
||||
FROM pg_roles
|
||||
WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
|
||||
AND NOT starts_with(rolname, 'pg_')
|
||||
LOOP
|
||||
IF role.bypassrls THEN
|
||||
RAISE EXCEPTION '% can bypass RLS', pg_catalog.quote_ident(role.name);
|
||||
RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
|
||||
END IF;
|
||||
END LOOP;
|
||||
END $$;
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
DO $$
|
||||
BEGIN
|
||||
IF (SELECT pg_catalog.current_setting('server_version_num')::pg_catalog.numeric < 160000) THEN
|
||||
IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
|
||||
RETURN;
|
||||
END IF;
|
||||
|
||||
IF NOT (SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
|
||||
IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
|
||||
RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
|
||||
END IF;
|
||||
END $$;
|
||||
|
||||
@@ -2,12 +2,12 @@ DO $$
|
||||
DECLARE
|
||||
monitor record;
|
||||
BEGIN
|
||||
SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
|
||||
SELECT pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
|
||||
admin_option AS admin
|
||||
INTO monitor
|
||||
FROM pg_catalog.pg_auth_members
|
||||
WHERE roleid = 'pg_monitor'::pg_catalog.regrole
|
||||
AND member = 'neon_superuser'::pg_catalog.regrole;
|
||||
FROM pg_auth_members
|
||||
WHERE roleid = 'pg_monitor'::regrole
|
||||
AND member = 'neon_superuser'::regrole;
|
||||
|
||||
IF monitor IS NULL THEN
|
||||
RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_monitor';
|
||||
|
||||
@@ -2,11 +2,11 @@ DO $$
|
||||
DECLARE
|
||||
can_execute boolean;
|
||||
BEGIN
|
||||
SELECT pg_catalog.bool_and(pg_catalog.has_function_privilege('neon_superuser', oid, 'execute'))
|
||||
SELECT bool_and(has_function_privilege('neon_superuser', oid, 'execute'))
|
||||
INTO can_execute
|
||||
FROM pg_catalog.pg_proc
|
||||
FROM pg_proc
|
||||
WHERE proname IN ('pg_export_snapshot', 'pg_log_standby_snapshot')
|
||||
AND pronamespace = 'pg_catalog'::pg_catalog.regnamespace;
|
||||
AND pronamespace = 'pg_catalog'::regnamespace;
|
||||
IF NOT can_execute THEN
|
||||
RAISE EXCEPTION 'neon_superuser cannot execute both pg_export_snapshot and pg_log_standby_snapshot';
|
||||
END IF;
|
||||
|
||||
@@ -2,9 +2,9 @@ DO $$
|
||||
DECLARE
|
||||
can_execute boolean;
|
||||
BEGIN
|
||||
SELECT pg_catalog.has_function_privilege('neon_superuser', oid, 'execute')
|
||||
SELECT has_function_privilege('neon_superuser', oid, 'execute')
|
||||
INTO can_execute
|
||||
FROM pg_catalog.pg_proc
|
||||
FROM pg_proc
|
||||
WHERE proname = 'pg_show_replication_origin_status'
|
||||
AND pronamespace = 'pg_catalog'::regnamespace;
|
||||
IF NOT can_execute THEN
|
||||
|
||||
@@ -2,10 +2,10 @@ DO $$
|
||||
DECLARE
|
||||
signal_backend record;
|
||||
BEGIN
|
||||
SELECT pg_catalog.pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
|
||||
SELECT pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
|
||||
admin_option AS admin
|
||||
INTO signal_backend
|
||||
FROM pg_catalog.pg_auth_members
|
||||
FROM pg_auth_members
|
||||
WHERE roleid = 'pg_signal_backend'::regrole
|
||||
AND member = 'neon_superuser'::regrole;
|
||||
|
||||
|
||||
@@ -407,9 +407,9 @@ fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
|
||||
// like `postgres_exporter` use it to query Postgres statistics.
|
||||
// Use explicit 8 bytes type casts to match Rust types.
|
||||
let stats = cli.query_one(
|
||||
"SELECT pg_catalog.coalesce(pg_catalog.sum(active_time), 0.0)::pg_catalog.float8 AS total_active_time,
|
||||
pg_catalog.coalesce(pg_catalog.sum(sessions), 0)::pg_catalog.bigint AS total_sessions
|
||||
FROM pg_catalog.pg_stat_database
|
||||
"SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time,
|
||||
coalesce(sum(sessions), 0)::bigint AS total_sessions
|
||||
FROM pg_stat_database
|
||||
WHERE datname NOT IN (
|
||||
'postgres',
|
||||
'template0',
|
||||
@@ -445,11 +445,11 @@ fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime
|
||||
let mut last_active: Option<DateTime<Utc>> = None;
|
||||
// Get all running client backends except ourself, use RFC3339 DateTime format.
|
||||
let backends = cli.query(
|
||||
"SELECT state, pg_catalog.to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"'::pg_catalog.text) AS state_change
|
||||
"SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change
|
||||
FROM pg_stat_activity
|
||||
WHERE backend_type OPERATOR(pg_catalog.=) 'client backend'::pg_catalog.text
|
||||
AND pid OPERATOR(pg_catalog.!=) pg_catalog.pg_backend_pid()
|
||||
AND usename OPERATOR(pg_catalog.!=) 'cloud_admin'::pg_catalog.name;", // XXX: find a better way to filter other monitors?
|
||||
WHERE backend_type = 'client backend'
|
||||
AND pid != pg_backend_pid()
|
||||
AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
|
||||
&[],
|
||||
);
|
||||
|
||||
|
||||
@@ -299,9 +299,9 @@ pub async fn get_existing_dbs_async(
|
||||
.query_raw::<str, &String, &[String; 0]>(
|
||||
"SELECT
|
||||
datname AS name,
|
||||
(SELECT rolname FROM pg_catalog.pg_roles WHERE oid OPERATOR(pg_catalog.=) datdba) AS owner,
|
||||
(SELECT rolname FROM pg_roles WHERE oid = datdba) AS owner,
|
||||
NOT datallowconn AS restrict_conn,
|
||||
datconnlimit OPERATOR(pg_catalog.=) (OPERATOR(pg_catalog.-) 2) AS invalid
|
||||
datconnlimit = - 2 AS invalid
|
||||
FROM
|
||||
pg_catalog.pg_database;",
|
||||
&[],
|
||||
@@ -466,13 +466,7 @@ fn update_pgbouncer_ini(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Tune pgbouncer.
|
||||
/// 1. Apply new config using pgbouncer admin console
|
||||
/// 2. Add new values to pgbouncer.ini to preserve them after restart
|
||||
pub async fn tune_pgbouncer(
|
||||
mut pgbouncer_config: IndexMap<String, String>,
|
||||
tls_config: Option<TlsConfig>,
|
||||
) -> Result<()> {
|
||||
async fn connect() -> Result<tokio_postgres::Client> {
|
||||
let pgbouncer_connstr = if std::env::var_os("AUTOSCALING").is_some() {
|
||||
// for VMs use pgbouncer specific way to connect to
|
||||
// pgbouncer admin console without password
|
||||
@@ -518,18 +512,17 @@ pub async fn tune_pgbouncer(
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(tls_config) = tls_config {
|
||||
// pgbouncer starts in a half-ok state if it cannot find these files.
|
||||
// It will default to client_tls_sslmode=deny, which causes proxy to error.
|
||||
// There is a small window at startup where these files don't yet exist in the VM.
|
||||
// Best to wait until it exists.
|
||||
loop {
|
||||
if let Ok(true) = tokio::fs::try_exists(&tls_config.key_path).await {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(500)).await
|
||||
}
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
/// Tune pgbouncer.
|
||||
/// 1. Apply new config to pgbouncer.ini
|
||||
/// 2. Notify pgbouncer to reload
|
||||
pub async fn tune_pgbouncer(
|
||||
mut pgbouncer_config: IndexMap<String, String>,
|
||||
tls_config: Option<TlsConfig>,
|
||||
) -> Result<()> {
|
||||
if let Some(tls_config) = tls_config {
|
||||
pgbouncer_config.insert("client_tls_cert_file".to_string(), tls_config.cert_path);
|
||||
pgbouncer_config.insert("client_tls_key_file".to_string(), tls_config.key_path);
|
||||
pgbouncer_config.insert("client_tls_sslmode".to_string(), "allow".to_string());
|
||||
@@ -550,10 +543,17 @@ pub async fn tune_pgbouncer(
|
||||
|
||||
info!("Applying pgbouncer setting change");
|
||||
|
||||
reload_pgbouncer().await
|
||||
}
|
||||
|
||||
/// Reload pgbouncer.
|
||||
pub async fn reload_pgbouncer() -> Result<()> {
|
||||
let client = connect().await?;
|
||||
|
||||
if let Err(err) = client.simple_query("RELOAD").await {
|
||||
// Don't fail on error, just print it into log
|
||||
error!("Failed to apply pgbouncer setting change, {err}",);
|
||||
};
|
||||
error!("Failed to apply pgbouncer setting change: {err}",);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ impl ComputeNode {
|
||||
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
|
||||
|
||||
drop_subscriptions_done = match
|
||||
client.query("select 1 from neon.drop_subscriptions_done where timeline_id OPERATOR(pg_catalog.=) $1", &[&timeline_id.to_string()]).await {
|
||||
client.query("select 1 from neon.drop_subscriptions_done where timeline_id = $1", &[&timeline_id.to_string()]).await {
|
||||
Ok(result) => !result.is_empty(),
|
||||
Err(e) =>
|
||||
{
|
||||
@@ -1142,9 +1142,7 @@ async fn get_operations<'a>(
|
||||
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
|
||||
if libs.contains("pg_stat_statements") {
|
||||
return Ok(Box::new(once(Operation {
|
||||
query: String::from(
|
||||
"CREATE EXTENSION IF NOT EXISTS pg_stat_statements WITH SCHEMA public",
|
||||
),
|
||||
query: String::from("CREATE EXTENSION IF NOT EXISTS pg_stat_statements"),
|
||||
comment: Some(String::from("create system extensions")),
|
||||
})));
|
||||
}
|
||||
@@ -1152,13 +1150,11 @@ async fn get_operations<'a>(
|
||||
Ok(Box::new(empty()))
|
||||
}
|
||||
ApplySpecPhase::CreatePgauditExtension => Ok(Box::new(once(Operation {
|
||||
query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit WITH SCHEMA public"),
|
||||
query: String::from("CREATE EXTENSION IF NOT EXISTS pgaudit"),
|
||||
comment: Some(String::from("create pgaudit extensions")),
|
||||
}))),
|
||||
ApplySpecPhase::CreatePgauditlogtofileExtension => Ok(Box::new(once(Operation {
|
||||
query: String::from(
|
||||
"CREATE EXTENSION IF NOT EXISTS pgauditlogtofile WITH SCHEMA public",
|
||||
),
|
||||
query: String::from("CREATE EXTENSION IF NOT EXISTS pgauditlogtofile"),
|
||||
comment: Some(String::from("create pgauditlogtofile extensions")),
|
||||
}))),
|
||||
// Disable pgaudit logging for postgres database.
|
||||
@@ -1182,7 +1178,7 @@ async fn get_operations<'a>(
|
||||
},
|
||||
Operation {
|
||||
query: String::from(
|
||||
"UPDATE pg_catalog.pg_extension SET extrelocatable = true WHERE extname OPERATOR(pg_catalog.=) 'neon'::pg_catalog.name AND extrelocatable OPERATOR(pg_catalog.=) false",
|
||||
"UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'",
|
||||
),
|
||||
comment: Some(String::from("compat/fix: make neon relocatable")),
|
||||
},
|
||||
|
||||
@@ -3,17 +3,16 @@ BEGIN
|
||||
IF NOT EXISTS(
|
||||
SELECT 1
|
||||
FROM pg_catalog.pg_tables
|
||||
WHERE tablename::pg_catalog.name OPERATOR(pg_catalog.=) 'health_check'::pg_catalog.name
|
||||
AND schemaname::pg_catalog.name OPERATOR(pg_catalog.=) 'public'::pg_catalog.name
|
||||
WHERE tablename = 'health_check'
|
||||
)
|
||||
THEN
|
||||
CREATE TABLE public.health_check (
|
||||
id pg_catalog.int4 primary key generated by default as identity,
|
||||
updated_at pg_catalog.timestamptz default pg_catalog.now()
|
||||
CREATE TABLE health_check (
|
||||
id serial primary key,
|
||||
updated_at timestamptz default now()
|
||||
);
|
||||
INSERT INTO public.health_check VALUES (1, pg_catalog.now())
|
||||
INSERT INTO health_check VALUES (1, now())
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET updated_at = pg_catalog.now();
|
||||
SET updated_at = now();
|
||||
END IF;
|
||||
END
|
||||
$$
|
||||
@@ -2,11 +2,10 @@ DO $$
|
||||
DECLARE
|
||||
query varchar;
|
||||
BEGIN
|
||||
FOR query IN
|
||||
SELECT pg_catalog.format('ALTER FUNCTION %I(%s) OWNER TO {db_owner};', p.oid::regproc, pg_catalog.pg_get_function_identity_arguments(p.oid))
|
||||
FROM pg_catalog.pg_proc p
|
||||
WHERE p.pronamespace OPERATOR(pg_catalog.=) 'anon'::regnamespace::oid
|
||||
LOOP
|
||||
FOR query IN SELECT 'ALTER FUNCTION '||nsp.nspname||'.'||p.proname||'('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {db_owner};'
|
||||
FROM pg_proc p
|
||||
JOIN pg_namespace nsp ON p.pronamespace = nsp.oid
|
||||
WHERE nsp.nspname = 'anon' LOOP
|
||||
EXECUTE query;
|
||||
END LOOP;
|
||||
END
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname OPERATOR(pg_catalog.=) '{privileged_role_name}'::pg_catalog.name)
|
||||
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{privileged_role_name}')
|
||||
THEN
|
||||
CREATE ROLE {privileged_role_name} {privileges} IN ROLE pg_read_all_data, pg_write_all_data;
|
||||
END IF;
|
||||
|
||||
@@ -4,14 +4,14 @@ $$
|
||||
IF EXISTS(
|
||||
SELECT nspname
|
||||
FROM pg_catalog.pg_namespace
|
||||
WHERE nspname OPERATOR(pg_catalog.=) 'public'
|
||||
WHERE nspname = 'public'
|
||||
) AND
|
||||
pg_catalog.current_setting('server_version_num')::int OPERATOR(pg_catalog./) 10000 OPERATOR(pg_catalog.>=) 15
|
||||
current_setting('server_version_num')::int / 10000 >= 15
|
||||
THEN
|
||||
IF EXISTS(
|
||||
SELECT rolname
|
||||
FROM pg_catalog.pg_roles
|
||||
WHERE rolname OPERATOR(pg_catalog.=) 'web_access'
|
||||
WHERE rolname = 'web_access'
|
||||
)
|
||||
THEN
|
||||
GRANT CREATE ON SCHEMA public TO web_access;
|
||||
@@ -20,7 +20,7 @@ $$
|
||||
IF EXISTS(
|
||||
SELECT nspname
|
||||
FROM pg_catalog.pg_namespace
|
||||
WHERE nspname OPERATOR(pg_catalog.=) 'public'
|
||||
WHERE nspname = 'public'
|
||||
)
|
||||
THEN
|
||||
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO neon_superuser WITH GRANT OPTION;
|
||||
|
||||
@@ -2,17 +2,11 @@ DO ${outer_tag}$
|
||||
DECLARE
|
||||
subname TEXT;
|
||||
BEGIN
|
||||
LOCK TABLE pg_catalog.pg_subscription IN ACCESS EXCLUSIVE MODE;
|
||||
FOR subname IN
|
||||
SELECT pg_subscription.subname
|
||||
FROM pg_catalog.pg_subscription
|
||||
WHERE subdbid OPERATOR(pg_catalog.=) (
|
||||
SELECT oid FROM pg_database WHERE datname OPERATOR(pg_catalog.=) {datname_str}::pg_catalog.name
|
||||
)
|
||||
LOOP
|
||||
EXECUTE pg_catalog.format('ALTER SUBSCRIPTION %I DISABLE;', subname);
|
||||
EXECUTE pg_catalog.format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
|
||||
EXECUTE pg_catalog.format('DROP SUBSCRIPTION %I;', subname);
|
||||
LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE;
|
||||
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
|
||||
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
|
||||
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);
|
||||
EXECUTE format('DROP SUBSCRIPTION %I;', subname);
|
||||
END LOOP;
|
||||
END;
|
||||
${outer_tag}$;
|
||||
|
||||
@@ -3,19 +3,19 @@ BEGIN
|
||||
IF NOT EXISTS(
|
||||
SELECT 1
|
||||
FROM pg_catalog.pg_tables
|
||||
WHERE tablename OPERATOR(pg_catalog.=) 'drop_subscriptions_done'::pg_catalog.name
|
||||
AND schemaname OPERATOR(pg_catalog.=) 'neon'::pg_catalog.name
|
||||
WHERE tablename = 'drop_subscriptions_done'
|
||||
AND schemaname = 'neon'
|
||||
)
|
||||
THEN
|
||||
CREATE TABLE neon.drop_subscriptions_done
|
||||
(id pg_catalog.int4 primary key generated by default as identity, timeline_id pg_catalog.text);
|
||||
(id serial primary key, timeline_id text);
|
||||
END IF;
|
||||
|
||||
-- preserve the timeline_id of the last drop_subscriptions run
|
||||
-- to ensure that the cleanup of a timeline is executed only once.
|
||||
-- use upsert to avoid the table bloat in case of cascade branching (branch of a branch)
|
||||
INSERT INTO neon.drop_subscriptions_done VALUES (1, pg_catalog.current_setting('neon.timeline_id'))
|
||||
INSERT INTO neon.drop_subscriptions_done VALUES (1, current_setting('neon.timeline_id'))
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET timeline_id = pg_catalog.current_setting('neon.timeline_id')::pg_catalog.text;
|
||||
SET timeline_id = current_setting('neon.timeline_id');
|
||||
END
|
||||
$$
|
||||
|
||||
@@ -15,15 +15,15 @@ BEGIN
|
||||
WHERE schema_name IN ('public')
|
||||
LOOP
|
||||
FOR grantor IN EXECUTE
|
||||
pg_catalog.format(
|
||||
'SELECT DISTINCT rtg.grantor FROM information_schema.role_table_grants AS rtg WHERE grantee OPERATOR(pg_catalog.=) %s',
|
||||
format(
|
||||
'SELECT DISTINCT rtg.grantor FROM information_schema.role_table_grants AS rtg WHERE grantee = %s',
|
||||
-- N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
|
||||
quote_literal({role_name})
|
||||
)
|
||||
LOOP
|
||||
EXECUTE pg_catalog.format('SET LOCAL ROLE %I', grantor);
|
||||
EXECUTE format('SET LOCAL ROLE %I', grantor);
|
||||
|
||||
revoke_query := pg_catalog.format(
|
||||
revoke_query := format(
|
||||
'REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA %I FROM %I GRANTED BY %I',
|
||||
schema,
|
||||
-- N.B. this has to be properly dollar-escaped with `pg_quote_dollar()`
|
||||
|
||||
@@ -5,17 +5,17 @@ DO ${outer_tag}$
|
||||
IF EXISTS(
|
||||
SELECT nspname
|
||||
FROM pg_catalog.pg_namespace
|
||||
WHERE nspname OPERATOR(pg_catalog.=) 'public'::pg_catalog.name
|
||||
WHERE nspname = 'public'
|
||||
)
|
||||
THEN
|
||||
SELECT nspowner::regrole::text
|
||||
FROM pg_catalog.pg_namespace
|
||||
WHERE nspname OPERATOR(pg_catalog.=) 'public'::pg_catalog.text
|
||||
WHERE nspname = 'public'
|
||||
INTO schema_owner;
|
||||
|
||||
IF schema_owner OPERATOR(pg_catalog.=) 'cloud_admin'::pg_catalog.text OR schema_owner OPERATOR(pg_catalog.=) 'zenith_admin'::pg_catalog.text
|
||||
IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'
|
||||
THEN
|
||||
EXECUTE pg_catalog.format('ALTER SCHEMA public OWNER TO %I', {db_owner});
|
||||
EXECUTE format('ALTER SCHEMA public OWNER TO %I', {db_owner});
|
||||
END IF;
|
||||
END IF;
|
||||
END
|
||||
|
||||
@@ -3,10 +3,10 @@ DO ${outer_tag}$
|
||||
IF EXISTS(
|
||||
SELECT 1
|
||||
FROM pg_catalog.pg_database
|
||||
WHERE datname OPERATOR(pg_catalog.=) {datname}::pg_catalog.name
|
||||
WHERE datname = {datname}
|
||||
)
|
||||
THEN
|
||||
EXECUTE pg_catalog.format('ALTER DATABASE %I is_template false', {datname});
|
||||
EXECUTE format('ALTER DATABASE %I is_template false', {datname});
|
||||
END IF;
|
||||
END
|
||||
${outer_tag}$;
|
||||
|
||||
@@ -3,42 +3,43 @@ use std::{io::Write, os::unix::fs::OpenOptionsExt, path::Path, time::Duration};
|
||||
use anyhow::{Context, Result, bail};
|
||||
use compute_api::responses::TlsConfig;
|
||||
use ring::digest;
|
||||
use x509_cert::Certificate;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct CertDigest(digest::Digest);
|
||||
|
||||
pub async fn watch_cert_for_changes(cert_path: String) -> tokio::sync::watch::Receiver<CertDigest> {
|
||||
let mut digest = compute_digest(&cert_path).await;
|
||||
let (tx, rx) = tokio::sync::watch::channel(digest);
|
||||
tokio::spawn(async move {
|
||||
while !tx.is_closed() {
|
||||
let new_digest = compute_digest(&cert_path).await;
|
||||
if digest.0.as_ref() != new_digest.0.as_ref() {
|
||||
digest = new_digest;
|
||||
_ = tx.send(digest);
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(60)).await
|
||||
}
|
||||
});
|
||||
rx
|
||||
impl PartialEq for CertDigest {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.0.as_ref() == other.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
async fn compute_digest(cert_path: &str) -> CertDigest {
|
||||
pub fn wait_until_cert_changed(digest: CertDigest, cert_path: &str) -> CertDigest {
|
||||
loop {
|
||||
match try_compute_digest(cert_path).await {
|
||||
let new_digest = compute_digest(cert_path);
|
||||
if digest != new_digest {
|
||||
break new_digest;
|
||||
}
|
||||
|
||||
// Wait a while before checking the certificates.
|
||||
// We renew on a daily basis, so there's no rush.
|
||||
std::thread::sleep(Duration::from_secs(60));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compute_digest(cert_path: &str) -> CertDigest {
|
||||
loop {
|
||||
match try_compute_digest(cert_path) {
|
||||
Ok(d) => break d,
|
||||
Err(e) => {
|
||||
tracing::error!("could not read cert file {e:?}");
|
||||
tokio::time::sleep(Duration::from_secs(1)).await
|
||||
std::thread::sleep(Duration::from_secs(1))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_compute_digest(cert_path: &str) -> Result<CertDigest> {
|
||||
let data = tokio::fs::read(cert_path).await?;
|
||||
fn try_compute_digest(cert_path: &str) -> Result<CertDigest> {
|
||||
let data = std::fs::read(cert_path)?;
|
||||
// sha256 is extremely collision resistent. can safely assume the digest to be unique
|
||||
Ok(CertDigest(digest::digest(&digest::SHA256, &data)))
|
||||
}
|
||||
@@ -46,28 +47,37 @@ async fn try_compute_digest(cert_path: &str) -> Result<CertDigest> {
|
||||
pub const SERVER_CRT: &str = "server.crt";
|
||||
pub const SERVER_KEY: &str = "server.key";
|
||||
|
||||
pub fn update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) {
|
||||
pub struct KeyPair {
|
||||
crt: String,
|
||||
key: String,
|
||||
}
|
||||
|
||||
pub fn load_certs_blocking(tls_config: &TlsConfig) -> KeyPair {
|
||||
loop {
|
||||
match try_update_key_path_blocking(pg_data, tls_config) {
|
||||
Ok(()) => break,
|
||||
match try_load_certs_blocking(tls_config) {
|
||||
Ok(key_pair) => break key_pair,
|
||||
Err(e) => {
|
||||
tracing::error!(error = ?e, "could not create key file");
|
||||
tracing::error!(error = ?e, "could not load certs");
|
||||
std::thread::sleep(Duration::from_secs(1))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Postgres requires the keypath be "secure". This means
|
||||
// 1. Owned by the postgres user.
|
||||
// 2. Have permission 600.
|
||||
fn try_update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) -> Result<()> {
|
||||
fn try_load_certs_blocking(tls_config: &TlsConfig) -> Result<KeyPair> {
|
||||
let key = std::fs::read_to_string(&tls_config.key_path)?;
|
||||
let crt = std::fs::read_to_string(&tls_config.cert_path)?;
|
||||
|
||||
// to mitigate a race condition during renewal.
|
||||
verify_key_cert(&key, &crt)?;
|
||||
|
||||
Ok(KeyPair { key, crt })
|
||||
}
|
||||
|
||||
// Postgres requires the keypath be "secure". This means
|
||||
// 1. Owned by the postgres user.
|
||||
// 2. Have permission 600.
|
||||
pub fn update_key_path_blocking(pg_data: &Path, key_pair: &KeyPair) -> Result<()> {
|
||||
let mut key_file = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
@@ -82,14 +92,22 @@ fn try_update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) -> Resul
|
||||
.mode(0o600)
|
||||
.open(pg_data.join(SERVER_CRT))?;
|
||||
|
||||
key_file.write_all(key.as_bytes())?;
|
||||
crt_file.write_all(crt.as_bytes())?;
|
||||
// NOTE: We currently ensure that an explicit reload does not happen during TLS renewal, but
|
||||
// there's a chance that postgres/pgbouncer/local_proxy reloads implicitly halfway between
|
||||
// these writes. This could allow them to reads the wrong keys to the wrong certs.
|
||||
// There doesn't seem to be any way to prevent that. However, we will issue a reload shortly
|
||||
// after which should at least correct it.
|
||||
key_file.write_all(key_pair.key.as_bytes())?;
|
||||
crt_file.write_all(key_pair.crt.as_bytes())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
|
||||
use x509_cert::Certificate;
|
||||
use x509_cert::der::oid::db::rfc5912::ECDSA_WITH_SHA_256;
|
||||
use x509_cert::der::oid::db::rfc8410::ID_ED_25519;
|
||||
use x509_cert::der::pem;
|
||||
|
||||
let certs = Certificate::load_pem_chain(cert.as_bytes())
|
||||
.context("decoding PEM encoded certificates")?;
|
||||
@@ -100,22 +118,30 @@ fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
|
||||
bail!("no certificates found");
|
||||
};
|
||||
|
||||
let pubkey = cert
|
||||
.tbs_certificate
|
||||
.subject_public_key_info
|
||||
.subject_public_key
|
||||
.raw_bytes();
|
||||
|
||||
match cert.signature_algorithm.oid {
|
||||
ECDSA_WITH_SHA_256 => {
|
||||
let key = p256::SecretKey::from_sec1_pem(key).context("parse key")?;
|
||||
|
||||
let a = key.public_key().to_sec1_bytes();
|
||||
let b = cert
|
||||
.tbs_certificate
|
||||
.subject_public_key_info
|
||||
.subject_public_key
|
||||
.raw_bytes();
|
||||
|
||||
if *a != *b {
|
||||
if *key.public_key().to_sec1_bytes() != *pubkey {
|
||||
bail!("private key file does not match certificate")
|
||||
}
|
||||
}
|
||||
_ => bail!("unknown TLS key type"),
|
||||
ID_ED_25519 => {
|
||||
use ring::signature::{Ed25519KeyPair, KeyPair};
|
||||
|
||||
let (_, bytes) = pem::decode_vec(key.as_bytes())
|
||||
.map_err(|_| anyhow::anyhow!("invalid key encoding"))?;
|
||||
let key = Ed25519KeyPair::from_pkcs8_maybe_unchecked(&bytes).context("parse key")?;
|
||||
if *key.public_key().as_ref() != *pubkey {
|
||||
bail!("private key file does not match certificate")
|
||||
}
|
||||
}
|
||||
oid => bail!("unknown TLS key type: {oid}"),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -1089,7 +1089,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
default_tenant_id: TenantId::from_array(std::array::from_fn(|_| 0)),
|
||||
storage_controller: None,
|
||||
control_plane_hooks_api: None,
|
||||
generate_local_ssl_certs: false,
|
||||
generate_local_tls_certs: false,
|
||||
generate_compute_tls_certs: false,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ impl StorageBroker {
|
||||
}
|
||||
|
||||
pub fn initialize(&self) -> anyhow::Result<()> {
|
||||
if self.env.generate_local_ssl_certs {
|
||||
if self.env.generate_local_tls_certs {
|
||||
self.env.generate_ssl_cert(
|
||||
&self.env.storage_broker_data_dir().join("server.crt"),
|
||||
&self.env.storage_broker_data_dir().join("server.key"),
|
||||
|
||||
@@ -54,7 +54,6 @@ use compute_api::requests::{
|
||||
};
|
||||
use compute_api::responses::{
|
||||
ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TerminateResponse,
|
||||
TlsConfig,
|
||||
};
|
||||
use compute_api::spec::{
|
||||
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PageserverProtocol,
|
||||
@@ -213,8 +212,13 @@ impl ComputeControlPlane {
|
||||
let internal_http_port = internal_http_port.unwrap_or_else(|| external_http_port + 1);
|
||||
let compute_ctl_config = ComputeCtlConfig {
|
||||
jwks: Self::create_jwks_from_pem(&self.env.read_public_key()?)?,
|
||||
tls: None::<TlsConfig>,
|
||||
tls: self.env.get_tls_config()?,
|
||||
};
|
||||
let mut features = vec![];
|
||||
if compute_ctl_config.tls.is_some() {
|
||||
features.push(ComputeFeature::TlsExperimental);
|
||||
}
|
||||
|
||||
let ep = Arc::new(Endpoint {
|
||||
endpoint_id: endpoint_id.to_owned(),
|
||||
pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), pg_port),
|
||||
@@ -241,7 +245,7 @@ impl ComputeControlPlane {
|
||||
drop_subscriptions_before_start,
|
||||
grpc,
|
||||
reconfigure_concurrency: 1,
|
||||
features: vec![],
|
||||
features: features.clone(),
|
||||
cluster: None,
|
||||
compute_ctl_config: compute_ctl_config.clone(),
|
||||
privileged_role_name: privileged_role_name.clone(),
|
||||
@@ -263,7 +267,7 @@ impl ComputeControlPlane {
|
||||
skip_pg_catalog_updates,
|
||||
drop_subscriptions_before_start,
|
||||
reconfigure_concurrency: 1,
|
||||
features: vec![],
|
||||
features,
|
||||
cluster: None,
|
||||
compute_ctl_config,
|
||||
privileged_role_name,
|
||||
@@ -953,7 +957,7 @@ impl Endpoint {
|
||||
}
|
||||
// keep retrying
|
||||
}
|
||||
ComputeStatus::Running => {
|
||||
ComputeStatus::Reloading | ComputeStatus::Running => {
|
||||
// All good!
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use std::{env, fs};
|
||||
|
||||
use anyhow::{Context, bail};
|
||||
use clap::ValueEnum;
|
||||
use compute_api::responses::TlsConfig;
|
||||
use pageserver_api::config::PostHogConfig;
|
||||
use pem::Pem;
|
||||
use postgres_backend::AuthType;
|
||||
@@ -95,7 +96,10 @@ pub struct LocalEnv {
|
||||
|
||||
/// Flag to generate SSL certificates for components that need it.
|
||||
/// Also generates root CA certificate that is used to sign all other certificates.
|
||||
pub generate_local_ssl_certs: bool,
|
||||
pub generate_local_tls_certs: bool,
|
||||
|
||||
/// Flag to generate SSL certificates for compute.
|
||||
pub generate_compute_tls_certs: bool,
|
||||
}
|
||||
|
||||
/// On-disk state stored in `.neon/config`.
|
||||
@@ -123,7 +127,11 @@ pub struct OnDiskConfig {
|
||||
// Note: skip serializing because in compat tests old storage controller fails
|
||||
// to load new config file. May be removed after this field is in release branch.
|
||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||
pub generate_local_ssl_certs: bool,
|
||||
pub generate_local_tls_certs: bool,
|
||||
// Note: skip serializing because in compat tests old storage controller fails
|
||||
// to load new config file. May be removed after this field is in release branch.
|
||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||
pub generate_compute_tls_certs: bool,
|
||||
}
|
||||
|
||||
fn fail_if_pageservers_field_specified<'de, D>(_: D) -> Result<Vec<PageServerConf>, D::Error>
|
||||
@@ -152,7 +160,8 @@ pub struct NeonLocalInitConf {
|
||||
pub endpoint_storage: EndpointStorageConf,
|
||||
pub control_plane_api: Option<Url>,
|
||||
pub control_plane_hooks_api: Option<Url>,
|
||||
pub generate_local_ssl_certs: bool,
|
||||
pub generate_local_tls_certs: bool,
|
||||
pub generate_compute_tls_certs: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
@@ -511,7 +520,7 @@ impl LocalEnv {
|
||||
}
|
||||
|
||||
pub fn ssl_ca_cert_path(&self) -> Option<PathBuf> {
|
||||
if self.generate_local_ssl_certs {
|
||||
if self.generate_local_tls_certs {
|
||||
Some(self.base_data_dir.join("rootCA.crt"))
|
||||
} else {
|
||||
None
|
||||
@@ -519,7 +528,7 @@ impl LocalEnv {
|
||||
}
|
||||
|
||||
pub fn ssl_ca_key_path(&self) -> Option<PathBuf> {
|
||||
if self.generate_local_ssl_certs {
|
||||
if self.generate_local_tls_certs {
|
||||
Some(self.base_data_dir.join("rootCA.key"))
|
||||
} else {
|
||||
None
|
||||
@@ -545,6 +554,33 @@ impl LocalEnv {
|
||||
)
|
||||
}
|
||||
|
||||
fn compute_ssl_paths(&self) -> Option<(PathBuf, PathBuf)> {
|
||||
if self.generate_compute_tls_certs {
|
||||
Some((
|
||||
self.base_data_dir.join("compute_server.crt"),
|
||||
self.base_data_dir.join("compute_server.key"),
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn generate_compute_ssl_cert(&self) -> anyhow::Result<()> {
|
||||
self.generate_ssl_ca_cert()?;
|
||||
|
||||
let (cert_path, key_path) = self.compute_ssl_paths().unwrap();
|
||||
if !fs::exists(&cert_path)? {
|
||||
generate_ssl_cert(
|
||||
&cert_path,
|
||||
&key_path,
|
||||
self.ssl_ca_cert_path().unwrap().as_path(),
|
||||
self.ssl_ca_key_path().unwrap().as_path(),
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates HTTP client with local SSL CA certificates.
|
||||
pub fn create_http_client(&self) -> reqwest::Client {
|
||||
let ssl_ca_certs = self.ssl_ca_cert_path().map(|ssl_ca_file| {
|
||||
@@ -673,7 +709,8 @@ impl LocalEnv {
|
||||
control_plane_hooks_api,
|
||||
control_plane_compute_hook_api: _,
|
||||
branch_name_mappings,
|
||||
generate_local_ssl_certs,
|
||||
generate_local_tls_certs,
|
||||
generate_compute_tls_certs,
|
||||
endpoint_storage,
|
||||
} = on_disk_config;
|
||||
LocalEnv {
|
||||
@@ -690,7 +727,8 @@ impl LocalEnv {
|
||||
control_plane_api: control_plane_api.unwrap(),
|
||||
control_plane_hooks_api,
|
||||
branch_name_mappings,
|
||||
generate_local_ssl_certs,
|
||||
generate_local_tls_certs,
|
||||
generate_compute_tls_certs,
|
||||
endpoint_storage,
|
||||
}
|
||||
};
|
||||
@@ -806,7 +844,8 @@ impl LocalEnv {
|
||||
control_plane_hooks_api: self.control_plane_hooks_api.clone(),
|
||||
control_plane_compute_hook_api: None,
|
||||
branch_name_mappings: self.branch_name_mappings.clone(),
|
||||
generate_local_ssl_certs: self.generate_local_ssl_certs,
|
||||
generate_local_tls_certs: self.generate_local_tls_certs,
|
||||
generate_compute_tls_certs: self.generate_compute_tls_certs,
|
||||
endpoint_storage: self.endpoint_storage.clone(),
|
||||
},
|
||||
)
|
||||
@@ -861,6 +900,21 @@ impl LocalEnv {
|
||||
Ok(pem)
|
||||
}
|
||||
|
||||
/// Get the TLS config if set.
|
||||
pub fn get_tls_config(&self) -> anyhow::Result<Option<TlsConfig>> {
|
||||
match self.compute_ssl_paths() {
|
||||
Some((cert_path, key_path)) => {
|
||||
self.generate_compute_ssl_cert()?;
|
||||
|
||||
Ok(Some(TlsConfig {
|
||||
key_path: key_path.to_str().context("utf8")?.to_string(),
|
||||
cert_path: cert_path.to_str().context("utf8")?.to_string(),
|
||||
}))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Materialize the [`NeonLocalInitConf`] to disk. Called during [`neon_local init`].
|
||||
pub fn init(conf: NeonLocalInitConf, force: &InitForceMode) -> anyhow::Result<()> {
|
||||
let base_path = base_path();
|
||||
@@ -912,7 +966,8 @@ impl LocalEnv {
|
||||
pageservers,
|
||||
safekeepers,
|
||||
control_plane_api,
|
||||
generate_local_ssl_certs,
|
||||
generate_local_tls_certs,
|
||||
generate_compute_tls_certs,
|
||||
control_plane_hooks_api,
|
||||
endpoint_storage,
|
||||
} = conf;
|
||||
@@ -965,13 +1020,17 @@ impl LocalEnv {
|
||||
control_plane_api: control_plane_api.unwrap(),
|
||||
control_plane_hooks_api,
|
||||
branch_name_mappings: Default::default(),
|
||||
generate_local_ssl_certs,
|
||||
generate_local_tls_certs,
|
||||
generate_compute_tls_certs,
|
||||
endpoint_storage,
|
||||
};
|
||||
|
||||
if generate_local_ssl_certs {
|
||||
if generate_local_tls_certs {
|
||||
env.generate_ssl_ca_cert()?;
|
||||
}
|
||||
if generate_compute_tls_certs {
|
||||
env.generate_compute_ssl_cert()?;
|
||||
}
|
||||
|
||||
// create endpoints dir
|
||||
fs::create_dir_all(env.endpoints_path())?;
|
||||
|
||||
@@ -241,7 +241,7 @@ impl PageServerNode {
|
||||
.context("write identity toml")?;
|
||||
drop(identity_toml);
|
||||
|
||||
if self.env.generate_local_ssl_certs {
|
||||
if self.env.generate_local_tls_certs {
|
||||
self.env.generate_ssl_cert(
|
||||
datadir.join("server.crt").as_path(),
|
||||
datadir.join("server.key").as_path(),
|
||||
|
||||
@@ -102,7 +102,7 @@ impl SafekeeperNode {
|
||||
/// Initializes a safekeeper node by creating all necessary files,
|
||||
/// e.g. SSL certificates and JWT token file.
|
||||
pub fn initialize(&self) -> anyhow::Result<()> {
|
||||
if self.env.generate_local_ssl_certs {
|
||||
if self.env.generate_local_tls_certs {
|
||||
self.env.generate_ssl_cert(
|
||||
&self.datadir_path().join("server.crt"),
|
||||
&self.datadir_path().join("server.key"),
|
||||
|
||||
@@ -353,7 +353,7 @@ impl StorageController {
|
||||
}
|
||||
}
|
||||
|
||||
if self.env.generate_local_ssl_certs {
|
||||
if self.env.generate_local_tls_certs {
|
||||
self.env.generate_ssl_cert(
|
||||
&instance_dir.join("server.crt"),
|
||||
&instance_dir.join("server.key"),
|
||||
|
||||
@@ -1,246 +0,0 @@
|
||||
# Node deletion API improvement
|
||||
|
||||
Created on 2025-07-07
|
||||
Implemented on _TBD_
|
||||
|
||||
## Summary
|
||||
|
||||
This RFC describes improvements to the storage controller API for gracefully deleting pageserver
|
||||
nodes.
|
||||
|
||||
## Motivation
|
||||
|
||||
The basic node deletion API introduced in [#8226](https://github.com/neondatabase/neon/issues/8333)
|
||||
has several limitations:
|
||||
|
||||
- Deleted nodes can re-add themselves if they restart (e.g., a flaky node that keeps restarting and
|
||||
we cannot reach via SSH to stop the pageserver). This issue has been resolved by tombstone
|
||||
mechanism in [#12036](https://github.com/neondatabase/neon/issues/12036)
|
||||
- Process of node deletion is not graceful, i.e. it just imitates a node failure
|
||||
|
||||
In this context, "graceful" node deletion means that users do not experience any disruption or
|
||||
negative effects, provided the system remains in a healthy state (i.e., the remaining pageservers
|
||||
can handle the workload and all requirements are met). To achieve this, the system must perform
|
||||
live migration of all tenant shards from the node being deleted while the node is still running
|
||||
and continue processing all incoming requests. The node is removed only after all tenant shards
|
||||
have been safely migrated.
|
||||
|
||||
Although live migrations can be achieved with the drain functionality, it leads to incorrect shard
|
||||
placement, such as not matching availability zones. This results in unnecessary work to optimize
|
||||
the placement that was just recently performed.
|
||||
|
||||
If we delete a node before its tenant shards are fully moved, the new node won't have all the
|
||||
needed data (e.g. heatmaps) ready. This means user requests to the new node will be much slower at
|
||||
first. If there are many tenant shards, this slowdown affects a huge amount of users.
|
||||
|
||||
Graceful node deletion is more complicated and can introduce new issues. It takes longer because
|
||||
live migration of each tenant shard can last several minutes. Using non-blocking accessors may
|
||||
also cause deletion to wait if other processes are holding inner state lock. It also gets trickier
|
||||
because we need to handle other requests, like drain and fill, at the same time.
|
||||
|
||||
## Impacted components (e.g. pageserver, safekeeper, console, etc)
|
||||
|
||||
- storage controller
|
||||
- pageserver (indirectly)
|
||||
|
||||
## Proposed implementation
|
||||
|
||||
### Tombstones
|
||||
|
||||
To resolve the problem of deleted nodes re-adding themselves, a tombstone mechanism was introduced
|
||||
as part of the node stored information. Each node has a separate `NodeLifecycle` field with two
|
||||
possible states: `Active` and `Deleted`. When node deletion completes, the database row is not
|
||||
deleted but instead has its `NodeLifecycle` column switched to `Deleted`. Nodes with `Deleted`
|
||||
lifecycle are treated as if the row is absent for most handlers, with several exceptions: reattach
|
||||
and register functionality must be aware of tombstones. Additionally, new debug handlers are
|
||||
available for listing and deleting tombstones via the `/debug/v1/tombstone` path.
|
||||
|
||||
### Gracefulness
|
||||
|
||||
The problem of making node deletion graceful is complex and involves several challenges:
|
||||
|
||||
- **Cancellable**: The operation must be cancellable to allow administrators to abort the process
|
||||
if needed, e.g. if run by mistake.
|
||||
- **Non-blocking**: We don't want to block deployment operations like draining/filling on the node
|
||||
deletion process. We need clear policies for handling concurrent operations: what happens when a
|
||||
drain/fill request arrives while deletion is in progress, and what happens when a delete request
|
||||
arrives while drain/fill is in progress.
|
||||
- **Persistent**: If the storage controller restarts during this long-running operation, we must
|
||||
preserve progress and automatically resume the deletion process after the storage controller
|
||||
restarts.
|
||||
- **Migrated correctly**: We cannot simply use the existing drain mechanism for nodes scheduled
|
||||
for deletion, as this would move shards to irrelevant locations. The drain process expects the
|
||||
node to return, so it only moves shards to backup locations, not to their preferred AZs. It also
|
||||
leaves secondary locations unmoved. This could result in unnecessary load on the storage
|
||||
controller and inefficient resource utilization.
|
||||
- **Force option**: Administrators need the ability to force immediate, non-graceful deletion when
|
||||
time constraints or emergency situations require it, bypassing the normal graceful migration
|
||||
process.
|
||||
|
||||
See below for a detailed breakdown of the proposed changes and mechanisms.
|
||||
|
||||
#### Node lifecycle
|
||||
|
||||
New `NodeLifecycle` enum and a matching database field with these values:
|
||||
- `Active`: The normal state. All operations are allowed.
|
||||
- `ScheduledForDeletion`: The node is marked to be deleted soon. Deletion may be in progress or
|
||||
will happen later, but the node will eventually be removed. All operations are allowed.
|
||||
- `Deleted`: The node is fully deleted. No operations are allowed, and the node cannot be brought
|
||||
back. The only action left is to remove its record from the database. Any attempt to register a
|
||||
node in this state will fail.
|
||||
|
||||
This state persists across storage controller restarts.
|
||||
|
||||
**State transition**
|
||||
```
|
||||
+--------------------+
|
||||
+---| Active |<---------------------+
|
||||
| +--------------------+ |
|
||||
| ^ |
|
||||
| start_node_delete | cancel_node_delete |
|
||||
v | |
|
||||
+----------------------------------+ |
|
||||
| ScheduledForDeletion | |
|
||||
+----------------------------------+ |
|
||||
| |
|
||||
| node_register |
|
||||
| |
|
||||
| delete_node (at the finish) |
|
||||
| |
|
||||
v |
|
||||
+---------+ tombstone_delete +----------+
|
||||
| Deleted |-------------------------------->| no row |
|
||||
+---------+ +----------+
|
||||
```
|
||||
|
||||
#### NodeSchedulingPolicy::Deleting
|
||||
|
||||
A `Deleting` variant to the `NodeSchedulingPolicy` enum. This means the deletion function is
|
||||
running for the node right now. Only one node can have the `Deleting` policy at a time.
|
||||
|
||||
The `NodeSchedulingPolicy::Deleting` state is persisted in the database. However, after a storage
|
||||
controller restart, any node previously marked as `Deleting` will have its scheduling policy reset
|
||||
to `Pause`. The policy will only transition back to `Deleting` when the deletion operation is
|
||||
actively started again, as triggered by the node's `NodeLifecycle::ScheduledForDeletion` state.
|
||||
|
||||
`NodeSchedulingPolicy` transition details:
|
||||
1. When `node_delete` begins, set the policy to `NodeSchedulingPolicy::Deleting`.
|
||||
2. If `node_delete` is cancelled (for example, due to a concurrent drain operation), revert the
|
||||
policy to its previous value. The policy is persisted in storcon DB.
|
||||
3. After `node_delete` completes, the final value of the scheduling policy is irrelevant, since
|
||||
`NodeLifecycle::Deleted` prevents any further access to this field.
|
||||
|
||||
The deletion process cannot be initiated for nodes currently undergoing deployment-related
|
||||
operations (`Draining`, `Filling`, or `PauseForRestart` policies). Deletion will only be triggered
|
||||
once the node transitions to either the `Active` or `Pause` state.
|
||||
|
||||
#### OperationTracker
|
||||
|
||||
A replacement for `Option<OperationHandler> ongoing_operation`, the `OperationTracker` is a
|
||||
dedicated service state object responsible for managing all long-running node operations (drain,
|
||||
fill, delete) with robust concurrency control.
|
||||
|
||||
Key responsibilities:
|
||||
- Orchestrates the execution of operations
|
||||
- Supports cancellation of currently running operations
|
||||
- Enforces operation constraints, e.g. allowing only single drain/fill operation at a time
|
||||
- Persists deletion state, enabling recovery of pending deletions across restarts
|
||||
- Ensures thread safety across concurrent requests
|
||||
|
||||
#### Attached tenant shard processing
|
||||
|
||||
When deleting a node, handle each attached tenant shard as follows:
|
||||
|
||||
1. Pick the best node to become the new attached (the candidate).
|
||||
2. If the candidate already has this shard as a secondary:
|
||||
- Create a new secondary for the shard on another suitable node.
|
||||
Otherwise:
|
||||
- Create a secondary for the shard on the candidate node.
|
||||
3. Wait until all secondaries are ready and pre-warmed.
|
||||
4. Promote the candidate's secondary to attached.
|
||||
5. Remove the secondary from the node being deleted.
|
||||
|
||||
This process safely moves all attached shards before deleting the node.
|
||||
|
||||
#### Secondary tenant shard processing
|
||||
|
||||
When deleting a node, handle each secondary tenant shard as follows:
|
||||
|
||||
1. Choose the best node to become the new secondary.
|
||||
2. Create a secondary for the shard on that node.
|
||||
3. Wait until the new secondary is ready.
|
||||
4. Remove the secondary from the node being deleted.
|
||||
|
||||
This ensures all secondary shards are safely moved before deleting the node.
|
||||
|
||||
### Reliability, failure modes and corner cases
|
||||
|
||||
In case of a storage controller failure and following restart, the system behavior depends on the
|
||||
`NodeLifecycle` state:
|
||||
|
||||
- If `NodeLifecycle` is `Active`: No action is taken for this node.
|
||||
- If `NodeLifecycle` is `Deleted`: The node will not be re-added.
|
||||
- If `NodeLifecycle` is `ScheduledForDeletion`: A deletion background task will be launched for
|
||||
this node.
|
||||
|
||||
In case of a pageserver node failure during deletion, the behavior depends on the `force` flag:
|
||||
- If `force` is set: The node deletion will proceed regardless of the node's availability.
|
||||
- If `force` is not set: The deletion will be retried a limited number of times. If the node
|
||||
remains unavailable, the deletion process will pause and automatically resume when the node
|
||||
becomes healthy again.
|
||||
|
||||
### Operations concurrency
|
||||
|
||||
The following sections describe the behavior when different types of requests arrive at the storage
|
||||
controller and how they interact with ongoing operations.
|
||||
|
||||
#### Delete request
|
||||
|
||||
Handler: `PUT /control/v1/node/:node_id/delete`
|
||||
|
||||
1. If node lifecycle is `NodeLifecycle::ScheduledForDeletion`:
|
||||
- Return `200 OK`: there is already an ongoing deletion request for this node
|
||||
2. Update & persist lifecycle to `NodeLifecycle::ScheduledForDeletion`
|
||||
3. Persist current scheduling policy
|
||||
4. If there is no active operation (drain/fill/delete):
|
||||
- Run deletion process for this node
|
||||
|
||||
#### Cancel delete request
|
||||
|
||||
Handler: `DELETE /control/v1/node/:node_id/delete`
|
||||
|
||||
1. If node lifecycle is not `NodeLifecycle::ScheduledForDeletion`:
|
||||
- Return `404 Not Found`: there is no current deletion request for this node
|
||||
2. If the active operation is deleting this node, cancel it
|
||||
3. Update & persist lifecycle to `NodeLifecycle::Active`
|
||||
4. Restore the last scheduling policy from persistence
|
||||
|
||||
#### Drain/fill request
|
||||
|
||||
1. If there are already ongoing drain/fill processes:
|
||||
- Return `409 Conflict`: queueing of drain/fill processes is not supported
|
||||
2. If there is an ongoing delete process:
|
||||
- Cancel it and wait until it is cancelled
|
||||
3. Run the drain/fill process
|
||||
4. After the drain/fill process is cancelled or finished:
|
||||
- Try to find another candidate to delete and run the deletion process for that node
|
||||
|
||||
#### Drain/fill cancel request
|
||||
|
||||
1. If the active operation is not the related process:
|
||||
- Return `400 Bad Request`: cancellation request is incorrect, operations are not the same
|
||||
2. Cancel the active operation
|
||||
3. Try to find another candidate to delete and run the deletion process for that node
|
||||
|
||||
## Definition of Done
|
||||
|
||||
- [x] Fix flaky node scenario and introduce related debug handlers
|
||||
- [ ] Node deletion intent is persistent - a node will be eventually deleted after a deletion
|
||||
request regardless of draining/filling requests and restarts
|
||||
- [ ] Node deletion can be graceful - deletion completes only after moving all tenant shards to
|
||||
recommended locations
|
||||
- [ ] Deploying does not break due to long deletions - drain/fill operations override deletion
|
||||
process and deletion resumes after drain/fill completes
|
||||
- [ ] `force` flag is implemented and provides fast, failure-tolerant node removal (e.g., when a
|
||||
pageserver node does not respond)
|
||||
- [ ] Legacy delete handler code is removed from storage_controller, test_runner, and storcon_cli
|
||||
@@ -27,7 +27,6 @@ pub struct ComputeConfig {
|
||||
pub spec: Option<ComputeSpec>,
|
||||
|
||||
/// The compute_ctl configuration
|
||||
#[allow(dead_code)]
|
||||
pub compute_ctl_config: ComputeCtlConfig,
|
||||
}
|
||||
|
||||
@@ -68,15 +67,11 @@ pub enum LfcPrewarmState {
|
||||
/// We tried to fetch the corresponding LFC state from the endpoint storage,
|
||||
/// but received `Not Found 404`. This should normally happen only during the
|
||||
/// first endpoint start after creation with `autoprewarm: true`.
|
||||
/// This may also happen if LFC is turned off or not initialized
|
||||
///
|
||||
/// During the orchestrated prewarm via API, when a caller explicitly
|
||||
/// provides the LFC state key to prewarm from, it's the caller responsibility
|
||||
/// to handle this status as an error state in this case.
|
||||
Skipped,
|
||||
/// LFC prewarm was cancelled. Some pages in LFC cache may be prewarmed if query
|
||||
/// has started working before cancellation
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl Display for LfcPrewarmState {
|
||||
@@ -87,7 +82,6 @@ impl Display for LfcPrewarmState {
|
||||
LfcPrewarmState::Completed => f.write_str("Completed"),
|
||||
LfcPrewarmState::Skipped => f.write_str("Skipped"),
|
||||
LfcPrewarmState::Failed { error } => write!(f, "Error({error})"),
|
||||
LfcPrewarmState::Cancelled => f.write_str("Cancelled"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -102,7 +96,6 @@ pub enum LfcOffloadState {
|
||||
Failed {
|
||||
error: String,
|
||||
},
|
||||
Skipped,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug, Clone, PartialEq)]
|
||||
@@ -161,6 +154,8 @@ pub enum ComputeStatus {
|
||||
Empty,
|
||||
// Compute configuration was requested.
|
||||
ConfigurationPending,
|
||||
// Postgres, pgbouncer, and local_proxy is currently being reloaded.
|
||||
Reloading,
|
||||
// Compute node has spec and initial startup and
|
||||
// configuration is in progress.
|
||||
Init,
|
||||
@@ -195,6 +190,7 @@ impl Display for ComputeStatus {
|
||||
match self {
|
||||
ComputeStatus::Empty => f.write_str("empty"),
|
||||
ComputeStatus::ConfigurationPending => f.write_str("configuration-pending"),
|
||||
ComputeStatus::Reloading => f.write_str("reloading"),
|
||||
ComputeStatus::RefreshConfiguration => f.write_str("refresh-configuration"),
|
||||
ComputeStatus::RefreshConfigurationPending => {
|
||||
f.write_str("refresh-configuration-pending")
|
||||
|
||||
@@ -341,34 +341,6 @@ extern "C-unwind" fn log_internal(
|
||||
}
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
extern "C" fn reset_safekeeper_statuses_for_metrics(wp: *mut WalProposer, num_safekeepers: u32) {
|
||||
unsafe {
|
||||
let callback_data = (*(*wp).config).callback_data;
|
||||
let api = callback_data as *mut Box<dyn ApiImpl>;
|
||||
if api.is_null() {
|
||||
return;
|
||||
}
|
||||
(*api).reset_safekeeper_statuses_for_metrics(&mut (*wp), num_safekeepers);
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" fn update_safekeeper_status_for_metrics(
|
||||
wp: *mut WalProposer,
|
||||
sk_index: u32,
|
||||
status: u8,
|
||||
) {
|
||||
unsafe {
|
||||
let callback_data = (*(*wp).config).callback_data;
|
||||
let api = callback_data as *mut Box<dyn ApiImpl>;
|
||||
if api.is_null() {
|
||||
return;
|
||||
}
|
||||
(*api).update_safekeeper_status_for_metrics(&mut (*wp), sk_index, status);
|
||||
}
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Level {
|
||||
Debug5,
|
||||
@@ -442,10 +414,6 @@ pub(crate) fn create_api() -> walproposer_api {
|
||||
finish_sync_safekeepers: Some(finish_sync_safekeepers),
|
||||
process_safekeeper_feedback: Some(process_safekeeper_feedback),
|
||||
log_internal: Some(log_internal),
|
||||
/* BEGIN_HADRON */
|
||||
reset_safekeeper_statuses_for_metrics: Some(reset_safekeeper_statuses_for_metrics),
|
||||
update_safekeeper_status_for_metrics: Some(update_safekeeper_status_for_metrics),
|
||||
/* END_HADRON */
|
||||
}
|
||||
}
|
||||
|
||||
@@ -483,8 +451,6 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
|
||||
replica_promote: false,
|
||||
min_ps_feedback: empty_feedback,
|
||||
wal_rate_limiter: empty_wal_rate_limiter,
|
||||
num_safekeepers: 0,
|
||||
safekeeper_status: [0; 32],
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -159,21 +159,6 @@ pub trait ApiImpl {
|
||||
fn after_election(&self, _wp: &mut WalProposer) {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
fn reset_safekeeper_statuses_for_metrics(&self, _wp: &mut WalProposer, _num_safekeepers: u32) {
|
||||
// Do nothing for testing purposes.
|
||||
}
|
||||
|
||||
fn update_safekeeper_status_for_metrics(
|
||||
&self,
|
||||
_wp: &mut WalProposer,
|
||||
_sk_index: u32,
|
||||
_status: u8,
|
||||
) {
|
||||
// Do nothing for testing purposes.
|
||||
}
|
||||
/* END_HADRON */
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -49,7 +49,6 @@
|
||||
#include "neon.h"
|
||||
#include "neon_lwlsncache.h"
|
||||
#include "neon_perf_counters.h"
|
||||
#include "neon_utils.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "communicator.h"
|
||||
|
||||
@@ -674,19 +673,8 @@ lfc_get_state(size_t max_entries)
|
||||
{
|
||||
if (GET_STATE(entry, j) != UNAVAILABLE)
|
||||
{
|
||||
/* Validate the buffer tag before including it */
|
||||
BufferTag test_tag = entry->key;
|
||||
test_tag.blockNum += j;
|
||||
|
||||
if (BufferTagIsValid(&test_tag))
|
||||
{
|
||||
BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j);
|
||||
n_pages += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(ERROR, "LFC: Skipping invalid buffer tag during cache state capture: blockNum=%u", test_tag.blockNum);
|
||||
}
|
||||
BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j);
|
||||
n_pages += 1;
|
||||
}
|
||||
}
|
||||
if (++i == n_entries)
|
||||
@@ -695,7 +683,7 @@ lfc_get_state(size_t max_entries)
|
||||
Assert(i == n_entries);
|
||||
fcs->n_pages = n_pages;
|
||||
Assert(pg_popcount((char*)bitmap, ((n_entries << lfc_chunk_size_log) + 7)/8) == n_pages);
|
||||
elog(LOG, "LFC: save state of %d chunks %d pages (validated)", (int)n_entries, (int)n_pages);
|
||||
elog(LOG, "LFC: save state of %d chunks %d pages", (int)n_entries, (int)n_pages);
|
||||
}
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
@@ -714,7 +702,6 @@ lfc_prewarm(FileCacheState* fcs, uint32 n_workers)
|
||||
size_t n_entries;
|
||||
size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size);
|
||||
size_t fcs_size;
|
||||
uint32_t max_prefetch_pages;
|
||||
dsm_segment *seg;
|
||||
BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS];
|
||||
|
||||
@@ -759,11 +746,6 @@ lfc_prewarm(FileCacheState* fcs, uint32 n_workers)
|
||||
n_entries = Min(fcs->n_chunks, lfc_prewarm_limit);
|
||||
Assert(n_entries != 0);
|
||||
|
||||
max_prefetch_pages = n_entries << fcs_chunk_size_log;
|
||||
if (fcs->n_pages > max_prefetch_pages) {
|
||||
elog(ERROR, "LFC: Number of pages in file cache state (%d) is more than the limit (%d)", fcs->n_pages, max_prefetch_pages);
|
||||
}
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
/* Do not prewarm more entries than LFC limit */
|
||||
@@ -916,11 +898,6 @@ lfc_prewarm_main(Datum main_arg)
|
||||
{
|
||||
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
|
||||
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
|
||||
|
||||
if (!BufferTagIsValid(&tag)) {
|
||||
elog(ERROR, "LFC: Invalid buffer tag: %u", tag.blockNum);
|
||||
}
|
||||
|
||||
if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum))
|
||||
{
|
||||
(void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
|
||||
|
||||
@@ -391,12 +391,6 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
|
||||
neon_per_backend_counters totals = {0};
|
||||
metric_t *metrics;
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
WalproposerShmemState *wp_shmem;
|
||||
uint32 num_safekeepers;
|
||||
uint32 num_active_safekeepers;
|
||||
/* END_HADRON */
|
||||
|
||||
/* We put all the tuples into a tuplestore in one go. */
|
||||
InitMaterializedSRF(fcinfo, 0);
|
||||
|
||||
@@ -443,32 +437,11 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
|
||||
// Not ideal but piggyback our databricks counters into the neon perf counters view
|
||||
// so that we don't need to introduce neon--1.x+1.sql to add a new view.
|
||||
{
|
||||
// Keeping this code in its own block to work around the C90 "don't mix declarations and code" rule when we define
|
||||
// the `databricks_metrics` array in the next block. Yes, we are seriously dealing with C90 rules in 2025.
|
||||
|
||||
// Read safekeeper status from wal proposer shared memory first.
|
||||
// Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is
|
||||
// consistent with the active wal acceptors count. Assuming that we don't query this view too often the mutex should
|
||||
// not be a huge deal.
|
||||
wp_shmem = GetWalpropShmemState();
|
||||
SpinLockAcquire(&wp_shmem->mutex);
|
||||
num_safekeepers = wp_shmem->num_safekeepers;
|
||||
num_active_safekeepers = 0;
|
||||
for (int i = 0; i < num_safekeepers; i++) {
|
||||
if (wp_shmem->safekeeper_status[i] == 1) {
|
||||
num_active_safekeepers++;
|
||||
}
|
||||
}
|
||||
SpinLockRelease(&wp_shmem->mutex);
|
||||
}
|
||||
{
|
||||
metric_t databricks_metrics[] = {
|
||||
{"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)},
|
||||
{"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)},
|
||||
{"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)},
|
||||
{"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)},
|
||||
{"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers},
|
||||
{"num_configured_safekeepers", false, 0.0, (double) num_safekeepers},
|
||||
{NULL, false, 0, 0},
|
||||
};
|
||||
for (int i = 0; databricks_metrics[i].name != NULL; i++)
|
||||
|
||||
@@ -183,22 +183,3 @@ alloc_curl_handle(void)
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Check if a BufferTag is valid by verifying all its fields are not invalid.
|
||||
*/
|
||||
bool
|
||||
BufferTagIsValid(const BufferTag *tag)
|
||||
{
|
||||
#if PG_MAJORVERSION_NUM >= 16
|
||||
return (tag->spcOid != InvalidOid) &&
|
||||
(tag->relNumber != InvalidRelFileNumber) &&
|
||||
(tag->forkNum != InvalidForkNumber) &&
|
||||
(tag->blockNum != InvalidBlockNumber);
|
||||
#else
|
||||
return (tag->rnode.spcNode != InvalidOid) &&
|
||||
(tag->rnode.relNode != InvalidOid) &&
|
||||
(tag->forkNum != InvalidForkNumber) &&
|
||||
(tag->blockNum != InvalidBlockNumber);
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
#define __NEON_UTILS_H__
|
||||
|
||||
#include "lib/stringinfo.h"
|
||||
#include "storage/buf_internals.h"
|
||||
|
||||
#ifndef WALPROPOSER_LIB
|
||||
#include <curl/curl.h>
|
||||
@@ -17,9 +16,6 @@ void pq_sendint32_le(StringInfo buf, uint32 i);
|
||||
void pq_sendint64_le(StringInfo buf, uint64 i);
|
||||
void disable_core_dump(void);
|
||||
|
||||
/* Buffer tag validation function */
|
||||
bool BufferTagIsValid(const BufferTag *tag);
|
||||
|
||||
#ifndef WALPROPOSER_LIB
|
||||
|
||||
CURL * alloc_curl_handle(void);
|
||||
|
||||
@@ -154,9 +154,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
wp->safekeeper[wp->n_safekeepers].state = SS_OFFLINE;
|
||||
wp->safekeeper[wp->n_safekeepers].active_state = SS_ACTIVE_SEND;
|
||||
wp->safekeeper[wp->n_safekeepers].wp = wp;
|
||||
/* BEGIN_HADRON */
|
||||
wp->safekeeper[wp->n_safekeepers].index = wp->n_safekeepers;
|
||||
/* END_HADRON */
|
||||
|
||||
{
|
||||
Safekeeper *sk = &wp->safekeeper[wp->n_safekeepers];
|
||||
int written = 0;
|
||||
@@ -185,10 +183,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
if (wp->safekeepers_generation > INVALID_GENERATION && wp->config->proto_version < 3)
|
||||
wp_log(FATAL, "enabling generations requires protocol version 3");
|
||||
wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version);
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
wp->api.reset_safekeeper_statuses_for_metrics(wp, wp->n_safekeepers);
|
||||
/* END_HADRON */
|
||||
|
||||
/* Fill the greeting package */
|
||||
wp->greetRequest.pam.tag = 'g';
|
||||
@@ -361,10 +355,6 @@ ShutdownConnection(Safekeeper *sk)
|
||||
sk->state = SS_OFFLINE;
|
||||
sk->streamingAt = InvalidXLogRecPtr;
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 0);
|
||||
/* END_HADRON */
|
||||
|
||||
MembershipConfigurationFree(&sk->greetResponse.mconf);
|
||||
if (sk->voteResponse.termHistory.entries)
|
||||
pfree(sk->voteResponse.termHistory.entries);
|
||||
@@ -1540,10 +1530,6 @@ StartStreaming(Safekeeper *sk)
|
||||
sk->active_state = SS_ACTIVE_SEND;
|
||||
sk->streamingAt = sk->startStreamingAt;
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 1);
|
||||
/* END_HADRON */
|
||||
|
||||
/*
|
||||
* Donors can only be in SS_ACTIVE state, so we potentially update the
|
||||
* donor when we switch one to SS_ACTIVE.
|
||||
|
||||
@@ -432,10 +432,6 @@ typedef struct WalproposerShmemState
|
||||
/* BEGIN_HADRON */
|
||||
/* The WAL rate limiter */
|
||||
WalRateLimiter wal_rate_limiter;
|
||||
/* Number of safekeepers in the config */
|
||||
uint32 num_safekeepers;
|
||||
/* Per-safekeeper status flags: 0=inactive, 1=active */
|
||||
uint8 safekeeper_status[MAX_SAFEKEEPERS];
|
||||
/* END_HADRON */
|
||||
} WalproposerShmemState;
|
||||
|
||||
@@ -487,11 +483,6 @@ typedef struct Safekeeper
|
||||
char const *host;
|
||||
char const *port;
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
/* index of this safekeeper in the WalProposer array */
|
||||
uint32 index;
|
||||
/* END_HADRON */
|
||||
|
||||
/*
|
||||
* connection string for connecting/reconnecting.
|
||||
*
|
||||
@@ -740,23 +731,6 @@ typedef struct walproposer_api
|
||||
* handled by elog().
|
||||
*/
|
||||
void (*log_internal) (WalProposer *wp, int level, const char *line);
|
||||
|
||||
/*
|
||||
* BEGIN_HADRON
|
||||
* APIs manipulating shared memory state used for Safekeeper quorum health metrics.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Reset the safekeeper statuses in shared memory for metric purposes.
|
||||
*/
|
||||
void (*reset_safekeeper_statuses_for_metrics) (WalProposer *wp, uint32 num_safekeepers);
|
||||
|
||||
/*
|
||||
* Update the safekeeper status in shared memory for metric purposes.
|
||||
*/
|
||||
void (*update_safekeeper_status_for_metrics) (WalProposer *wp, uint32 sk_index, uint8 status);
|
||||
|
||||
/* END_HADRON */
|
||||
} walproposer_api;
|
||||
|
||||
/*
|
||||
|
||||
@@ -2261,27 +2261,6 @@ GetNeonCurrentClusterSize(void)
|
||||
}
|
||||
uint64 GetNeonCurrentClusterSize(void);
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
static void
|
||||
walprop_pg_reset_safekeeper_statuses_for_metrics(WalProposer *wp, uint32 num_safekeepers)
|
||||
{
|
||||
WalproposerShmemState* shmem = wp->api.get_shmem_state(wp);
|
||||
SpinLockAcquire(&shmem->mutex);
|
||||
shmem->num_safekeepers = num_safekeepers;
|
||||
memset(shmem->safekeeper_status, 0, sizeof(shmem->safekeeper_status));
|
||||
SpinLockRelease(&shmem->mutex);
|
||||
}
|
||||
|
||||
static void
|
||||
walprop_pg_update_safekeeper_status_for_metrics(WalProposer *wp, uint32 sk_index, uint8 status)
|
||||
{
|
||||
WalproposerShmemState* shmem = wp->api.get_shmem_state(wp);
|
||||
Assert(sk_index < MAX_SAFEKEEPERS);
|
||||
SpinLockAcquire(&shmem->mutex);
|
||||
shmem->safekeeper_status[sk_index] = status;
|
||||
SpinLockRelease(&shmem->mutex);
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
static const walproposer_api walprop_pg = {
|
||||
.get_shmem_state = walprop_pg_get_shmem_state,
|
||||
@@ -2315,6 +2294,4 @@ static const walproposer_api walprop_pg = {
|
||||
.finish_sync_safekeepers = walprop_pg_finish_sync_safekeepers,
|
||||
.process_safekeeper_feedback = walprop_pg_process_safekeeper_feedback,
|
||||
.log_internal = walprop_pg_log_internal,
|
||||
.reset_safekeeper_statuses_for_metrics = walprop_pg_reset_safekeeper_statuses_for_metrics,
|
||||
.update_safekeeper_status_for_metrics = walprop_pg_update_safekeeper_status_for_metrics,
|
||||
};
|
||||
|
||||
@@ -700,10 +700,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
ip_allowlist_check_enabled: !args.is_private_access_proxy,
|
||||
is_vpc_acccess_proxy: args.is_private_access_proxy,
|
||||
is_auth_broker: args.is_auth_broker,
|
||||
#[cfg(not(feature = "rest_broker"))]
|
||||
accept_jwts: args.is_auth_broker,
|
||||
#[cfg(feature = "rest_broker")]
|
||||
accept_jwts: args.is_auth_broker || args.is_rest_broker,
|
||||
console_redirect_confirmation_timeout: args.webauth_confirmation_timeout,
|
||||
};
|
||||
|
||||
|
||||
@@ -458,7 +458,7 @@ pub(crate) enum LocalProxyConnError {
|
||||
impl ReportableError for HttpConnError {
|
||||
fn get_error_kind(&self) -> ErrorKind {
|
||||
match self {
|
||||
HttpConnError::ConnectError(e) => e.get_error_kind(),
|
||||
HttpConnError::ConnectError(_) => ErrorKind::Compute,
|
||||
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
|
||||
HttpConnError::PostgresConnectionError(p) => match p.as_db_error() {
|
||||
// user provided a wrong database name
|
||||
|
||||
@@ -612,25 +612,19 @@ pub async fn handle_request(
|
||||
}
|
||||
}
|
||||
|
||||
let max_term = statuses
|
||||
.iter()
|
||||
.map(|(status, _)| status.acceptor_state.term)
|
||||
.max()
|
||||
.unwrap();
|
||||
|
||||
// Find the most advanced safekeeper
|
||||
let (status, i) = statuses
|
||||
.into_iter()
|
||||
.max_by_key(|(status, _)| {
|
||||
(
|
||||
status.acceptor_state.epoch,
|
||||
status.flush_lsn,
|
||||
/* BEGIN_HADRON */
|
||||
// We need to pull from the SK with the highest term.
|
||||
// This is because another compute may come online and vote the same highest term again on the other two SKs.
|
||||
// Then, there will be 2 computes running on the same term.
|
||||
status.acceptor_state.term,
|
||||
/* END_HADRON */
|
||||
status.flush_lsn,
|
||||
status.commit_lsn,
|
||||
)
|
||||
})
|
||||
@@ -640,22 +634,6 @@ pub async fn handle_request(
|
||||
assert!(status.tenant_id == request.tenant_id);
|
||||
assert!(status.timeline_id == request.timeline_id);
|
||||
|
||||
// TODO(diko): This is hadron only check to make sure that we pull the timeline
|
||||
// from the safekeeper with the highest term during timeline restore.
|
||||
// We could avoid returning the error by calling bump_term after pull_timeline.
|
||||
// However, this is not a big deal because we retry the pull_timeline requests.
|
||||
// The check should be removed together with removing custom hadron logic for
|
||||
// safekeeper restore.
|
||||
if wait_for_peer_timeline_status && status.acceptor_state.term != max_term {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!(
|
||||
"choosen safekeeper {} has term {}, but the most advanced term is {}",
|
||||
safekeeper_host, status.acceptor_state.term, max_term
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
|
||||
match pull_timeline(
|
||||
status,
|
||||
safekeeper_host,
|
||||
|
||||
@@ -195,14 +195,12 @@ impl StateSK {
|
||||
to: Configuration,
|
||||
) -> Result<TimelineMembershipSwitchResponse> {
|
||||
let result = self.state_mut().membership_switch(to).await?;
|
||||
let flush_lsn = self.flush_lsn();
|
||||
let last_log_term = self.state().acceptor_state.get_last_log_term(flush_lsn);
|
||||
|
||||
Ok(TimelineMembershipSwitchResponse {
|
||||
previous_conf: result.previous_conf,
|
||||
current_conf: result.current_conf,
|
||||
last_log_term,
|
||||
flush_lsn,
|
||||
last_log_term: self.state().acceptor_state.term,
|
||||
flush_lsn: self.flush_lsn(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -471,17 +471,11 @@ impl Persistence {
|
||||
&self,
|
||||
input_node_id: NodeId,
|
||||
input_https_port: Option<u16>,
|
||||
input_grpc_addr: Option<String>,
|
||||
input_grpc_port: Option<u16>,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::nodes::dsl::*;
|
||||
self.update_node(
|
||||
input_node_id,
|
||||
(
|
||||
listen_https_port.eq(input_https_port.map(|x| x as i32)),
|
||||
listen_grpc_addr.eq(input_grpc_addr),
|
||||
listen_grpc_port.eq(input_grpc_port.map(|x| x as i32)),
|
||||
),
|
||||
listen_https_port.eq(input_https_port.map(|x| x as i32)),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -7813,7 +7813,7 @@ impl Service {
|
||||
register_req.listen_https_port,
|
||||
register_req.listen_pg_addr,
|
||||
register_req.listen_pg_port,
|
||||
register_req.listen_grpc_addr.clone(),
|
||||
register_req.listen_grpc_addr,
|
||||
register_req.listen_grpc_port,
|
||||
register_req.availability_zone_id.clone(),
|
||||
self.config.use_https_pageserver_api,
|
||||
@@ -7848,8 +7848,6 @@ impl Service {
|
||||
.update_node_on_registration(
|
||||
register_req.node_id,
|
||||
register_req.listen_https_port,
|
||||
register_req.listen_grpc_addr,
|
||||
register_req.listen_grpc_port,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
|
||||
@@ -24,12 +24,12 @@ use pageserver_api::controller_api::{
|
||||
};
|
||||
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
|
||||
use safekeeper_api::PgVersionId;
|
||||
use safekeeper_api::Term;
|
||||
use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
|
||||
use safekeeper_api::models::{
|
||||
PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
|
||||
TimelineMembershipSwitchResponse,
|
||||
};
|
||||
use safekeeper_api::{INITIAL_TERM, Term};
|
||||
use safekeeper_client::mgmt_api;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -1298,7 +1298,13 @@ impl Service {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let sync_position = Self::get_sync_position(&results)?;
|
||||
let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
|
||||
for res in results.into_iter().flatten() {
|
||||
let sk_position = (res.last_log_term, res.flush_lsn);
|
||||
if sync_position < sk_position {
|
||||
sync_position = sk_position;
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
%generation,
|
||||
@@ -1592,36 +1598,4 @@ impl Service {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get membership switch responses from all safekeepers and return the sync position.
|
||||
///
|
||||
/// Sync position is a position equal or greater than the commit position.
|
||||
/// It is guaranteed that all WAL entries with (last_log_term, flush_lsn)
|
||||
/// greater than the sync position are not committed (= not on a quorum).
|
||||
///
|
||||
/// Returns error if there is no quorum of successful responses.
|
||||
fn get_sync_position(
|
||||
responses: &[mgmt_api::Result<TimelineMembershipSwitchResponse>],
|
||||
) -> Result<(Term, Lsn), ApiError> {
|
||||
let quorum_size = responses.len() / 2 + 1;
|
||||
|
||||
let mut wal_positions = responses
|
||||
.iter()
|
||||
.flatten()
|
||||
.map(|res| (res.last_log_term, res.flush_lsn))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Should be already checked if the responses are from tenant_timeline_set_membership_quorum.
|
||||
if wal_positions.len() < quorum_size {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"not enough successful responses to get sync position: {}/{}",
|
||||
wal_positions.len(),
|
||||
quorum_size,
|
||||
)));
|
||||
}
|
||||
|
||||
wal_positions.sort();
|
||||
|
||||
Ok(wal_positions[quorum_size - 1])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,26 +78,20 @@ class EndpointHttpClient(requests.Session):
|
||||
json: dict[str, str] = res.json()
|
||||
return json
|
||||
|
||||
def prewarm_lfc(self, from_endpoint_id: str | None = None) -> dict[str, str]:
|
||||
def prewarm_lfc(self, from_endpoint_id: str | None = None):
|
||||
"""
|
||||
Prewarm LFC cache from given endpoint and wait till it finishes or errors
|
||||
"""
|
||||
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
|
||||
self.post(self.prewarm_url, params=params).raise_for_status()
|
||||
return self.prewarm_lfc_wait()
|
||||
self.prewarm_lfc_wait()
|
||||
|
||||
def cancel_prewarm_lfc(self):
|
||||
"""
|
||||
Cancel LFC prewarm if any is ongoing
|
||||
"""
|
||||
self.delete(self.prewarm_url).raise_for_status()
|
||||
|
||||
def prewarm_lfc_wait(self) -> dict[str, str]:
|
||||
def prewarm_lfc_wait(self):
|
||||
"""
|
||||
Wait till LFC prewarm returns with error or success.
|
||||
If prewarm was not requested before calling this function, it will error
|
||||
"""
|
||||
statuses = "failed", "completed", "skipped", "cancelled"
|
||||
statuses = "failed", "completed", "skipped"
|
||||
|
||||
def prewarmed():
|
||||
json = self.prewarm_lfc_status()
|
||||
@@ -107,7 +101,6 @@ class EndpointHttpClient(requests.Session):
|
||||
wait_until(prewarmed, timeout=60)
|
||||
res = self.prewarm_lfc_status()
|
||||
assert res["status"] != "failed", res
|
||||
return res
|
||||
|
||||
def offload_lfc_status(self) -> dict[str, str]:
|
||||
res = self.get(self.offload_url)
|
||||
@@ -115,31 +108,29 @@ class EndpointHttpClient(requests.Session):
|
||||
json: dict[str, str] = res.json()
|
||||
return json
|
||||
|
||||
def offload_lfc(self) -> dict[str, str]:
|
||||
def offload_lfc(self):
|
||||
"""
|
||||
Offload LFC cache to endpoint storage and wait till offload finishes or errors
|
||||
"""
|
||||
self.post(self.offload_url).raise_for_status()
|
||||
return self.offload_lfc_wait()
|
||||
self.offload_lfc_wait()
|
||||
|
||||
def offload_lfc_wait(self) -> dict[str, str]:
|
||||
def offload_lfc_wait(self):
|
||||
"""
|
||||
Wait till LFC offload returns with error or success.
|
||||
If offload was not requested before calling this function, it will error
|
||||
"""
|
||||
statuses = "failed", "completed", "skipped"
|
||||
|
||||
def offloaded():
|
||||
json = self.offload_lfc_status()
|
||||
status, err = json["status"], json.get("error")
|
||||
assert status in statuses, f"{status}, {err=}"
|
||||
assert status in ["failed", "completed"], f"{status}, {err=}"
|
||||
|
||||
wait_until(offloaded, timeout=60)
|
||||
res = self.offload_lfc_status()
|
||||
assert res["status"] != "failed", res
|
||||
return res
|
||||
|
||||
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False) -> dict[str, str]:
|
||||
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False):
|
||||
url = f"http://localhost:{self.external_port}/promote"
|
||||
if disconnect:
|
||||
try: # send first request to start promote and disconnect
|
||||
|
||||
@@ -79,7 +79,6 @@ class NeonAPI:
|
||||
elif resp.status_code == 423 and resp.json()["message"] in {
|
||||
"endpoint is in some transitive state, could not suspend",
|
||||
"project already has running conflicting operations, scheduling of new ones is prohibited",
|
||||
"snapshot is in transition",
|
||||
}:
|
||||
retry = True
|
||||
self.retries4xx += 1
|
||||
@@ -106,7 +105,6 @@ class NeonAPI:
|
||||
branch_name: str | None = None,
|
||||
branch_role_name: str | None = None,
|
||||
branch_database_name: str | None = None,
|
||||
project_settings: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
data: dict[str, Any] = {
|
||||
"project": {
|
||||
@@ -123,8 +121,6 @@ class NeonAPI:
|
||||
data["project"]["branch"]["role_name"] = branch_role_name
|
||||
if branch_database_name:
|
||||
data["project"]["branch"]["database_name"] = branch_database_name
|
||||
if project_settings:
|
||||
data["project"]["settings"] = project_settings
|
||||
|
||||
resp = self.__request(
|
||||
"POST",
|
||||
@@ -359,63 +355,6 @@ class NeonAPI:
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def create_snapshot(
|
||||
self,
|
||||
project_id: str,
|
||||
branch_id: str,
|
||||
lsn: str | None = None,
|
||||
timestamp: str | None = None,
|
||||
name: str | None = None,
|
||||
expires_at: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
params: dict[str, Any] = {
|
||||
"lsn": lsn,
|
||||
"timestamp": timestamp,
|
||||
"name": name,
|
||||
"expires_at": expires_at,
|
||||
}
|
||||
params = {key: value for key, value in params.items() if value is not None}
|
||||
resp = self.__request(
|
||||
"POST",
|
||||
f"/projects/{project_id}/branches/{branch_id}/snapshot",
|
||||
params=params,
|
||||
json={},
|
||||
headers={
|
||||
"Accept": "application/json",
|
||||
},
|
||||
)
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def delete_snapshot(self, project_id: str, snapshot_id: str) -> dict[str, Any]:
|
||||
resp = self.__request("DELETE", f"/projects/{project_id}/snapshots/{snapshot_id}")
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def restore_snapshot(
|
||||
self,
|
||||
project_id: str,
|
||||
snapshot_id: str,
|
||||
target_branch_id: str,
|
||||
name: str | None = None,
|
||||
finalize_restore: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
data: dict[str, Any] = {
|
||||
"target_branch_id": target_branch_id,
|
||||
"finalize_restore": finalize_restore,
|
||||
}
|
||||
if name is not None:
|
||||
data["name"] = name
|
||||
log.info("Restore snapshot data: %s", data)
|
||||
resp = self.__request(
|
||||
"POST",
|
||||
f"/projects/{project_id}/snapshots/{snapshot_id}/restore",
|
||||
json=data,
|
||||
headers={
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def delete_endpoint(self, project_id: str, endpoint_id: str) -> dict[str, Any]:
|
||||
resp = self.__request("DELETE", f"/projects/{project_id}/endpoints/{endpoint_id}")
|
||||
return cast("dict[str,Any]", resp.json())
|
||||
@@ -457,14 +396,6 @@ class NeonAPI:
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_branch_endpoints(self, project_id: str, branch_id: str) -> dict[str, Any]:
|
||||
resp = self.__request(
|
||||
"GET",
|
||||
f"/projects/{project_id}/branches/{branch_id}/endpoints",
|
||||
headers={"Accept": "application/json", "Content-Type": "application/json"},
|
||||
)
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_endpoints(self, project_id: str) -> dict[str, Any]:
|
||||
resp = self.__request(
|
||||
"GET",
|
||||
|
||||
@@ -262,6 +262,7 @@ class PgProtocol:
|
||||
# pooler does not support statement_timeout
|
||||
# Check if the hostname contains the string 'pooler'
|
||||
hostname = result.get("host", "")
|
||||
log.info(f"Hostname: {hostname}")
|
||||
options = result.get("options", "")
|
||||
if "statement_timeout" not in options and "pooler" not in hostname:
|
||||
options = f"-cstatement_timeout=120s {options}"
|
||||
@@ -505,6 +506,8 @@ class NeonEnvBuilder:
|
||||
# Flag to use https listener in storage broker, generate local ssl certs,
|
||||
# and force pageservers and safekeepers to use https for storage broker api.
|
||||
self.use_https_storage_broker_api: bool = False
|
||||
# Flag to enable TLS for computes
|
||||
self.use_compute_tls: bool = False
|
||||
|
||||
self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine
|
||||
self.pageserver_get_vectored_concurrent_io: str | None = (
|
||||
@@ -1111,14 +1114,16 @@ class NeonEnv:
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
self.generate_local_ssl_certs = (
|
||||
self.generate_compute_tls_certs = config.use_compute_tls
|
||||
self.generate_local_tls_certs = (
|
||||
config.use_https_pageserver_api
|
||||
or config.use_https_safekeeper_api
|
||||
or config.use_https_storage_controller_api
|
||||
or config.use_https_storage_broker_api
|
||||
or config.use_compute_tls
|
||||
)
|
||||
self.ssl_ca_file = (
|
||||
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None
|
||||
self.tls_ca_file = (
|
||||
self.repo_dir.joinpath("rootCA.crt") if self.generate_local_tls_certs else None
|
||||
)
|
||||
|
||||
neon_local_env_vars = {}
|
||||
@@ -1197,7 +1202,8 @@ class NeonEnv:
|
||||
"endpoint_storage": {
|
||||
"listen_addr": f"127.0.0.1:{self.port_distributor.get_port()}",
|
||||
},
|
||||
"generate_local_ssl_certs": self.generate_local_ssl_certs,
|
||||
"generate_local_tls_certs": self.generate_local_tls_certs,
|
||||
"generate_compute_tls_certs": self.generate_compute_tls_certs,
|
||||
}
|
||||
|
||||
if config.use_https_storage_broker_api:
|
||||
@@ -1941,7 +1947,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
self.auth_enabled = auth_enabled
|
||||
self.allowed_errors: list[str] = DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS
|
||||
self.logfile = self.env.repo_dir / "storage_controller_1" / "storage_controller.log"
|
||||
self.ssl_ca_file = env.ssl_ca_file
|
||||
self.tls_ca_file = env.tls_ca_file
|
||||
|
||||
def start(
|
||||
self,
|
||||
@@ -2014,8 +2020,8 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
return PageserverHttpClient(self.port, lambda: True, auth_token, *args, **kwargs)
|
||||
|
||||
def request(self, method, *args, **kwargs) -> requests.Response:
|
||||
if self.ssl_ca_file is not None:
|
||||
kwargs["verify"] = self.ssl_ca_file
|
||||
if self.tls_ca_file is not None:
|
||||
kwargs["verify"] = self.tls_ca_file
|
||||
resp = requests.request(method, *args, **kwargs)
|
||||
NeonStorageController.raise_api_exception(resp)
|
||||
|
||||
@@ -2313,7 +2319,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
timeline_id: TimelineId,
|
||||
new_sk_set: list[int],
|
||||
):
|
||||
log.info(f"migrate_safekeepers({tenant_id}, {timeline_id}, {new_sk_set})")
|
||||
response = self.request(
|
||||
"POST",
|
||||
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate",
|
||||
@@ -5280,32 +5285,16 @@ class EndpointFactory:
|
||||
)
|
||||
|
||||
def stop_all(self, fail_on_error=True) -> Self:
|
||||
"""
|
||||
Stop all the endpoints in parallel.
|
||||
"""
|
||||
|
||||
# Note: raising an exception from a task in a task group cancels
|
||||
# all the other tasks. We don't want that, hence the 'stop_one'
|
||||
# function catches exceptions and puts them on the 'exceptions'
|
||||
# list for later processing.
|
||||
exceptions = []
|
||||
|
||||
async def stop_one(ep):
|
||||
exception = None
|
||||
for ep in self.endpoints:
|
||||
try:
|
||||
await asyncio.to_thread(ep.stop)
|
||||
ep.stop()
|
||||
except Exception as e:
|
||||
log.error(f"Failed to stop endpoint {ep.endpoint_id}: {e}")
|
||||
exceptions.append(e)
|
||||
exception = e
|
||||
|
||||
async def async_stop_all():
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
for ep in self.endpoints:
|
||||
tg.create_task(stop_one(ep))
|
||||
|
||||
asyncio.run(async_stop_all())
|
||||
|
||||
if fail_on_error and exceptions:
|
||||
raise ExceptionGroup("stopping an endpoint failed", exceptions)
|
||||
if fail_on_error and exception is not None:
|
||||
raise exception
|
||||
|
||||
return self
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ import time
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
|
||||
@@ -23,29 +22,6 @@ if TYPE_CHECKING:
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
class NeonSnapshot:
|
||||
"""
|
||||
A snapshot of the Neon Branch
|
||||
Gets the output of the API call af a snapshot creation
|
||||
"""
|
||||
|
||||
def __init__(self, project: NeonProject, snapshot: dict[str, Any]):
|
||||
self.project: NeonProject = project
|
||||
snapshot = snapshot["snapshot"]
|
||||
self.id: str = snapshot["id"]
|
||||
self.name: str = snapshot["name"]
|
||||
self.created_at: datetime = datetime.fromisoformat(snapshot["created_at"])
|
||||
self.source_branch: NeonBranch = project.branches[snapshot["source_branch_id"]]
|
||||
project.snapshots[self.id] = self
|
||||
self.restored: bool = False
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"id: {self.id}, name: {self.name}, created_at: {self.created_at}"
|
||||
|
||||
def delete(self) -> None:
|
||||
self.project.delete_snapshot(self.id)
|
||||
|
||||
|
||||
class NeonEndpoint:
|
||||
"""
|
||||
Neon Endpoint
|
||||
@@ -91,21 +67,9 @@ class NeonBranch:
|
||||
is_reset defines if the branch is a reset one i.e. created as a result of the reset API Call
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
project,
|
||||
branch: dict[str, Any],
|
||||
is_reset=False,
|
||||
primary_branch: NeonBranch | None = None,
|
||||
):
|
||||
def __init__(self, project, branch: dict[str, Any], is_reset=False):
|
||||
self.id: str = branch["branch"]["id"]
|
||||
self.desc = branch
|
||||
self.name: str | None = None
|
||||
if "name" in branch["branch"]:
|
||||
self.name = branch["branch"]["name"]
|
||||
self.restored_from: str | None = None
|
||||
if "restored_from" in branch["branch"]:
|
||||
self.restored_from = branch["branch"]["restored_from"]
|
||||
self.project: NeonProject = project
|
||||
self.neon_api: NeonAPI = project.neon_api
|
||||
self.project_id: str = branch["branch"]["project_id"]
|
||||
@@ -146,36 +110,13 @@ class NeonBranch:
|
||||
"PGPASSWORD": self.connection_parameters["password"],
|
||||
"PGSSLMODE": "require",
|
||||
}
|
||||
self.replicas: dict[str, NeonBranch] = {}
|
||||
self.primary_branch: NeonBranch | None = primary_branch
|
||||
if primary_branch:
|
||||
if not self.connection_parameters:
|
||||
raise ValueError(
|
||||
"connection_parameters is required when primary_branch is specified"
|
||||
)
|
||||
self.project.replicas[self.id] = self
|
||||
primary_branch.replicas[self.id] = self
|
||||
with psycopg2.connect(primary_branch.connstr()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(f"CREATE PUBLICATION {self.id} FOR ALL TABLES")
|
||||
conn.commit()
|
||||
with psycopg2.connect(self.connstr()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
f"CREATE SUBSCRIPTION {self.id} CONNECTION '{primary_branch.connstr()}' PUBLICATION {self.id}"
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def __str__(self):
|
||||
"""
|
||||
Prints the branch's information with all the predecessors
|
||||
Prints the branch's name with all the predecessors
|
||||
(r) means the branch is a reset one
|
||||
"""
|
||||
name = f"({self.name})" if self.name and self.name != self.id else ""
|
||||
restored_from = f"(restored_from: {self.restored_from})" if self.restored_from else ""
|
||||
ancestor = (
|
||||
f" <- {self.primary_branch}" if self.primary_branch else f", parent: {self.parent}"
|
||||
)
|
||||
return f"{self.id}{name}{restored_from}{ancestor}"
|
||||
return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}"
|
||||
|
||||
def random_time(self) -> datetime:
|
||||
min_time = max(
|
||||
@@ -187,10 +128,8 @@ class NeonBranch:
|
||||
log.info("min_time: %s, max_time: %s", min_time, max_time)
|
||||
return (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)
|
||||
|
||||
def create_child_branch(
|
||||
self, parent_timestamp: datetime | None = None, primary_branch: NeonBranch | None = None
|
||||
) -> NeonBranch | None:
|
||||
return self.project.create_branch(self.id, parent_timestamp, primary_branch=primary_branch)
|
||||
def create_child_branch(self, parent_timestamp: datetime | None = None) -> NeonBranch | None:
|
||||
return self.project.create_branch(self.id, parent_timestamp)
|
||||
|
||||
def create_ro_endpoint(self) -> NeonEndpoint | None:
|
||||
if not self.project.check_limit_endpoints():
|
||||
@@ -213,9 +152,6 @@ class NeonBranch:
|
||||
self.project.terminate_benchmark(self.id)
|
||||
|
||||
def reset_to_parent(self) -> None:
|
||||
"""
|
||||
Resets the branch to the parent branch
|
||||
"""
|
||||
for ep in self.project.endpoints.values():
|
||||
if ep.type == "read_only":
|
||||
ep.terminate_benchmark()
|
||||
@@ -281,19 +217,6 @@ class NeonBranch:
|
||||
ep.start_benchmark()
|
||||
return res
|
||||
|
||||
def create_logical_replica(self) -> NeonBranch | None:
|
||||
if self.primary_branch is not None:
|
||||
raise RuntimeError("The primary branch cannot be a logical replica")
|
||||
if self.id in self.project.reset_branches:
|
||||
raise RuntimeError("Reset branch cannot be a primary branch")
|
||||
replica = self.create_child_branch(primary_branch=self)
|
||||
return replica
|
||||
|
||||
def connstr(self):
|
||||
if self.connection_parameters is None:
|
||||
raise RuntimeError("Connection parameters are not defined")
|
||||
return " ".join([f"{key}={value}" for key, value in self.connection_parameters.items()])
|
||||
|
||||
|
||||
class NeonProject:
|
||||
"""
|
||||
@@ -305,9 +228,7 @@ class NeonProject:
|
||||
self.neon_api = neon_api
|
||||
self.pg_bin = pg_bin
|
||||
proj = self.neon_api.create_project(
|
||||
pg_version,
|
||||
f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}",
|
||||
project_settings={"enable_logical_replication": True},
|
||||
pg_version, f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
|
||||
)
|
||||
self.id: str = proj["project"]["id"]
|
||||
self.name: str = proj["project"]["name"]
|
||||
@@ -319,7 +240,6 @@ class NeonProject:
|
||||
# Leaf branches are the branches, which do not have children
|
||||
self.leaf_branches: dict[str, NeonBranch] = {}
|
||||
self.branches: dict[str, NeonBranch] = {}
|
||||
self.branch_num: int = 0
|
||||
self.reset_branches: set[str] = set()
|
||||
self.main_branch: NeonBranch = NeonBranch(self, proj)
|
||||
self.main_branch.connection_parameters = self.connection_parameters
|
||||
@@ -333,9 +253,6 @@ class NeonProject:
|
||||
self.limits: dict[str, Any] = self.get_limits()["limits"]
|
||||
self.read_only_endpoints_total: int = 0
|
||||
self.min_time: datetime = datetime.now(UTC)
|
||||
self.snapshots: dict[str, NeonSnapshot] = {}
|
||||
self.snapshot_num: int = 0
|
||||
self.replicas: dict[str, NeonBranch] = {}
|
||||
|
||||
def get_limits(self) -> dict[str, Any]:
|
||||
return self.neon_api.get_project_limits(self.id)
|
||||
@@ -363,11 +280,7 @@ class NeonProject:
|
||||
return False
|
||||
|
||||
def create_branch(
|
||||
self,
|
||||
parent_id: str | None = None,
|
||||
parent_timestamp: datetime | None = None,
|
||||
is_reset: bool = False,
|
||||
primary_branch: NeonBranch | None = None,
|
||||
self, parent_id: str | None = None, parent_timestamp: datetime | None = None
|
||||
) -> NeonBranch | None:
|
||||
self.wait()
|
||||
if not self.check_limit_branches():
|
||||
@@ -380,14 +293,14 @@ class NeonProject:
|
||||
branch_def = self.neon_api.create_branch(
|
||||
self.id, parent_id=parent_id, parent_timestamp=parent_timestamp_str
|
||||
)
|
||||
new_branch = NeonBranch(self, branch_def, is_reset, primary_branch)
|
||||
new_branch = NeonBranch(self, branch_def)
|
||||
self.wait()
|
||||
return new_branch
|
||||
|
||||
def delete_branch(self, branch_id: str) -> None:
|
||||
parent = self.branches[branch_id].parent
|
||||
if not parent or branch_id == self.main_branch.id:
|
||||
raise RuntimeError("Cannot delete the main branch or a branch restored from a snapshot")
|
||||
raise RuntimeError("Cannot delete the main branch")
|
||||
if branch_id not in self.leaf_branches and branch_id not in self.reset_branches:
|
||||
raise RuntimeError(f"The branch {branch_id}, probably, has ancestors")
|
||||
if branch_id not in self.branches:
|
||||
@@ -400,18 +313,7 @@ class NeonProject:
|
||||
if branch_id not in self.reset_branches:
|
||||
self.terminate_benchmark(branch_id)
|
||||
self.neon_api.delete_branch(self.id, branch_id)
|
||||
primary_branch = self.branches[branch_id].primary_branch
|
||||
if primary_branch is not None:
|
||||
with psycopg2.connect(primary_branch.connstr()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(f"DROP PUBLICATION {branch_id}")
|
||||
conn.commit()
|
||||
parent.replicas.pop(branch_id)
|
||||
self.replicas.pop(branch_id)
|
||||
else:
|
||||
for replica in self.branches[branch_id].replicas.values():
|
||||
replica.delete()
|
||||
if len(parent.children) == 1 and parent.parent is not None:
|
||||
if len(parent.children) == 1 and parent.id != self.main_branch.id:
|
||||
self.leaf_branches[parent.id] = parent
|
||||
parent.children.pop(branch_id)
|
||||
if branch_id in self.leaf_branches:
|
||||
@@ -431,26 +333,6 @@ class NeonProject:
|
||||
log.info("No leaf branches found")
|
||||
return target
|
||||
|
||||
def get_random_parent_branch(self) -> NeonBranch:
|
||||
return self.branches[
|
||||
random.choice(
|
||||
list(set(self.branches.keys()) - self.reset_branches - set(self.replicas.keys()))
|
||||
)
|
||||
]
|
||||
|
||||
def gen_branch_name(self) -> str:
|
||||
self.branch_num += 1
|
||||
return f"branch{self.branch_num}"
|
||||
|
||||
def get_random_snapshot(self) -> NeonSnapshot | None:
|
||||
snapshot: NeonSnapshot | None = None
|
||||
avail_snapshots = [sn for sn in self.snapshots.values() if not sn.restored]
|
||||
if avail_snapshots:
|
||||
snapshot = random.choice(avail_snapshots)
|
||||
else:
|
||||
log.info("No snapshots found")
|
||||
return snapshot
|
||||
|
||||
def delete_endpoint(self, endpoint_id: str) -> None:
|
||||
self.terminate_benchmark(endpoint_id)
|
||||
self.neon_api.delete_endpoint(self.id, endpoint_id)
|
||||
@@ -527,116 +409,6 @@ class NeonProject:
|
||||
self.restore_num += 1
|
||||
return f"restore{self.restore_num}"
|
||||
|
||||
def gen_snapshot_name(self) -> str:
|
||||
self.snapshot_num += 1
|
||||
return f"snapshot{self.snapshot_num}"
|
||||
|
||||
def create_snapshot(
|
||||
self,
|
||||
lsn: str | None = None,
|
||||
timestamp: datetime | None = None,
|
||||
) -> NeonSnapshot:
|
||||
"""
|
||||
Create a new Neon snapshot for the current project
|
||||
Two optional arguments: lsn and timestamp are mutually exclusive
|
||||
they instruct to create a snapshot with the specific lns or timestamp
|
||||
"""
|
||||
snapshot_name = self.gen_snapshot_name()
|
||||
with psycopg2.connect(self.connection_uri) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We will check the value we set now after the snapshot restored to verify consistency
|
||||
cur.execute(
|
||||
f"INSERT INTO sanity_check (name, value) VALUES "
|
||||
f"('snapsot_name', '{snapshot_name}') ON CONFLICT (name) DO UPDATE SET value = EXCLUDED.value"
|
||||
)
|
||||
conn.commit()
|
||||
snapshot = NeonSnapshot(
|
||||
self,
|
||||
self.neon_api.create_snapshot(
|
||||
self.id,
|
||||
self.main_branch.id,
|
||||
lsn,
|
||||
timestamp.isoformat().replace("+00:00", "Z") if timestamp else None,
|
||||
snapshot_name,
|
||||
),
|
||||
)
|
||||
self.wait()
|
||||
# Now we taint the value after the snapshot was taken
|
||||
cur.execute("UPDATE sanity_check SET value = 'tainted' || value")
|
||||
conn.commit()
|
||||
return snapshot
|
||||
|
||||
def delete_snapshot(self, snapshot_id: str) -> None:
|
||||
"""
|
||||
Deletes the snapshot with the given id
|
||||
"""
|
||||
self.wait()
|
||||
self.neon_api.delete_snapshot(self.id, snapshot_id)
|
||||
self.snapshots.pop(snapshot_id)
|
||||
self.wait()
|
||||
|
||||
def restore_snapshot(self, snapshot_id: str) -> NeonBranch | None:
|
||||
"""
|
||||
Creates a new Neon branch for the current project, then restores the snapshot
|
||||
with the given id
|
||||
"""
|
||||
target_branch = self.get_random_parent_branch().create_child_branch()
|
||||
if not target_branch:
|
||||
return None
|
||||
self.snapshots[snapshot_id].restored = True
|
||||
new_branch_def: dict[str, Any] = self.neon_api.restore_snapshot(
|
||||
self.id,
|
||||
snapshot_id,
|
||||
target_branch.id,
|
||||
self.gen_branch_name(),
|
||||
)
|
||||
self.wait()
|
||||
new_branch_def = self.neon_api.get_branch_details(self.id, new_branch_def["branch"]["id"])
|
||||
# The restored branch will lose the parent afterward, but it has it during the restoration.
|
||||
# So, we delete parent_id
|
||||
new_branch_def["branch"].pop("parent_id")
|
||||
new_branch = NeonBranch(self, new_branch_def)
|
||||
log.info("Restored snapshot to the branch: %s", new_branch)
|
||||
target_branch_def = self.neon_api.get_branch_details(self.id, target_branch.id)
|
||||
if "name" in target_branch_def["branch"]:
|
||||
target_branch.name = target_branch_def["branch"]["name"]
|
||||
if new_branch.connection_parameters is None:
|
||||
if not new_branch.endpoints:
|
||||
for ep in self.neon_api.get_branch_endpoints(self.id, new_branch.id)["endpoints"]:
|
||||
if ep["id"] not in self.endpoints:
|
||||
NeonEndpoint(self, ep)
|
||||
new_branch.connection_parameters = self.connection_parameters.copy()
|
||||
for ep in new_branch.endpoints.values():
|
||||
if ep.type == "read_write":
|
||||
new_branch.connection_parameters["host"] = ep.host
|
||||
break
|
||||
new_branch.connect_env = {
|
||||
"PGHOST": new_branch.connection_parameters["host"],
|
||||
"PGUSER": new_branch.connection_parameters["role"],
|
||||
"PGDATABASE": new_branch.connection_parameters["database"],
|
||||
"PGPASSWORD": new_branch.connection_parameters["password"],
|
||||
"PGSSLMODE": "require",
|
||||
}
|
||||
with psycopg2.connect(
|
||||
host=new_branch.connection_parameters["host"],
|
||||
port=5432,
|
||||
user=new_branch.connection_parameters["role"],
|
||||
password=new_branch.connection_parameters["password"],
|
||||
database=new_branch.connection_parameters["database"],
|
||||
) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT value FROM sanity_check WHERE name = 'snapsot_name'")
|
||||
snapshot_name = None
|
||||
if row := cur.fetchone():
|
||||
snapshot_name = row[0]
|
||||
# We verify here that the value we select from the table matches with the snapshot name
|
||||
# To ensure consistency
|
||||
assert snapshot_name == self.snapshots[snapshot_id].name
|
||||
self.wait()
|
||||
target_branch.start_benchmark()
|
||||
new_branch.start_benchmark()
|
||||
return new_branch
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def setup_class(
|
||||
@@ -666,7 +438,9 @@ def do_action(project: NeonProject, action: str) -> bool:
|
||||
if action == "new_branch" or action == "new_branch_random_time":
|
||||
use_random_time: bool = action == "new_branch_random_time"
|
||||
log.info("Trying to create a new branch %s", "random time" if use_random_time else "")
|
||||
parent = project.get_random_parent_branch()
|
||||
parent = project.branches[
|
||||
random.choice(list(set(project.branches.keys()) - project.reset_branches))
|
||||
]
|
||||
child = parent.create_child_branch(parent.random_time() if use_random_time else None)
|
||||
if child is None:
|
||||
return False
|
||||
@@ -705,31 +479,6 @@ def do_action(project: NeonProject, action: str) -> bool:
|
||||
return False
|
||||
log.info("Reset to parent %s", target)
|
||||
target.reset_to_parent()
|
||||
elif action == "create_snapshot":
|
||||
snapshot = project.create_snapshot()
|
||||
if snapshot is None:
|
||||
return False
|
||||
log.info("Created snapshot %s", snapshot)
|
||||
elif action == "restore_snapshot":
|
||||
if (snapshot_to_restore := project.get_random_snapshot()) is None:
|
||||
return False
|
||||
log.info("Restoring snapshot %s", snapshot_to_restore)
|
||||
if project.restore_snapshot(snapshot_to_restore.id) is None:
|
||||
return False
|
||||
elif action == "delete_snapshot":
|
||||
snapshot_to_delete = project.get_random_snapshot()
|
||||
if snapshot_to_delete is None:
|
||||
return False
|
||||
snapshot_to_delete.delete()
|
||||
log.info("Deleted snapshot %s", snapshot_to_delete)
|
||||
elif action == "create_logical_replica":
|
||||
primary: NeonBranch | None = project.get_random_parent_branch()
|
||||
if primary is None:
|
||||
return False
|
||||
replica: NeonBranch | None = primary.create_logical_replica()
|
||||
if replica is None:
|
||||
return False
|
||||
log.info("Created logical replica %s", replica)
|
||||
else:
|
||||
raise ValueError(f"The action {action} is unknown")
|
||||
return True
|
||||
@@ -763,28 +512,12 @@ def test_api_random(
|
||||
("delete_branch", 1.2),
|
||||
("restore_random_time", 0.9),
|
||||
("reset_to_parent", 0.3),
|
||||
("create_snapshot", 0.2),
|
||||
("restore_snapshot", 0.1),
|
||||
("delete_snapshot", 0.1),
|
||||
)
|
||||
if num_ops_env := os.getenv("NUM_OPERATIONS"):
|
||||
num_operations = int(num_ops_env)
|
||||
else:
|
||||
num_operations = 250
|
||||
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
|
||||
# Create a table for sanity check
|
||||
# We are going to leve some control values there to check, e.g., after restoring a snapshot
|
||||
pg_bin.run(
|
||||
[
|
||||
"psql",
|
||||
"-c",
|
||||
"CREATE TABLE IF NOT EXISTS sanity_check (name VARCHAR NOT NULL PRIMARY KEY, value VARCHAR)",
|
||||
],
|
||||
env=project.main_branch.connect_env,
|
||||
)
|
||||
# To not go to the past where pgbench tables do not exist
|
||||
time.sleep(1)
|
||||
project.min_time = datetime.now(UTC)
|
||||
# To not go to the past where pgbench tables do not exist
|
||||
time.sleep(1)
|
||||
project.min_time = datetime.now(UTC)
|
||||
|
||||
@@ -863,6 +863,7 @@ def test_pageserver_compaction_circuit_breaker(neon_env_builder: NeonEnvBuilder)
|
||||
assert not env.pageserver.log_contains(".*Circuit breaker failure ended.*")
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Lakebase mode")
|
||||
def test_ps_corruption_detection_feedback(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that when the pageserver detects corruption during image layer creation,
|
||||
@@ -889,9 +890,7 @@ def test_ps_corruption_detection_feedback(neon_env_builder: NeonEnvBuilder):
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
workload = Workload(
|
||||
env, tenant_id, timeline_id, endpoint_opts={"config_lines": ["neon.lakebase_mode=true"]}
|
||||
)
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
|
||||
# Enable the failpoint that will cause image layer creation to fail due to a (simulated) detected
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import random
|
||||
import threading
|
||||
from enum import StrEnum
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
from typing import Any
|
||||
|
||||
@@ -47,23 +47,19 @@ def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor)
|
||||
# With autoprewarm, we need to be sure LFC was offloaded after all writes
|
||||
# finish, so we sleep. Otherwise we'll have less prewarmed pages than we want
|
||||
sleep(AUTOOFFLOAD_INTERVAL_SECS)
|
||||
offload_res = client.offload_lfc_wait()
|
||||
log.info(offload_res)
|
||||
return offload_res
|
||||
client.offload_lfc_wait()
|
||||
return
|
||||
|
||||
if method == PrewarmMethod.COMPUTE_CTL:
|
||||
status = client.prewarm_lfc_status()
|
||||
assert status["status"] == "not_prewarmed"
|
||||
assert "error" not in status
|
||||
offload_res = client.offload_lfc()
|
||||
log.info(offload_res)
|
||||
client.offload_lfc()
|
||||
assert client.prewarm_lfc_status()["status"] == "not_prewarmed"
|
||||
|
||||
parsed = prom_parse(client)
|
||||
desired = {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0, OFFLOAD_ERR_LABEL: 0, PREWARM_ERR_LABEL: 0}
|
||||
assert parsed == desired, f"{parsed=} != {desired=}"
|
||||
|
||||
return offload_res
|
||||
return
|
||||
|
||||
raise AssertionError(f"{method} not in PrewarmMethod")
|
||||
|
||||
@@ -72,30 +68,21 @@ def prewarm_endpoint(
|
||||
method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor, lfc_state: str | None
|
||||
):
|
||||
if method == PrewarmMethod.AUTOPREWARM:
|
||||
prewarm_res = client.prewarm_lfc_wait()
|
||||
log.info(prewarm_res)
|
||||
client.prewarm_lfc_wait()
|
||||
elif method == PrewarmMethod.COMPUTE_CTL:
|
||||
prewarm_res = client.prewarm_lfc()
|
||||
log.info(prewarm_res)
|
||||
return prewarm_res
|
||||
client.prewarm_lfc()
|
||||
elif method == PrewarmMethod.POSTGRES:
|
||||
cur.execute("select neon.prewarm_local_cache(%s)", (lfc_state,))
|
||||
|
||||
|
||||
def check_prewarmed_contains(
|
||||
def check_prewarmed(
|
||||
method: PrewarmMethod, client: EndpointHttpClient, desired_status: dict[str, str | int]
|
||||
):
|
||||
if method == PrewarmMethod.AUTOPREWARM:
|
||||
prewarm_status = client.prewarm_lfc_status()
|
||||
for k in desired_status:
|
||||
assert desired_status[k] == prewarm_status[k]
|
||||
|
||||
assert client.prewarm_lfc_status() == desired_status
|
||||
assert prom_parse(client)[PREWARM_LABEL] == 1
|
||||
elif method == PrewarmMethod.COMPUTE_CTL:
|
||||
prewarm_status = client.prewarm_lfc_status()
|
||||
for k in desired_status:
|
||||
assert desired_status[k] == prewarm_status[k]
|
||||
|
||||
assert client.prewarm_lfc_status() == desired_status
|
||||
desired = {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1, PREWARM_ERR_LABEL: 0, OFFLOAD_ERR_LABEL: 0}
|
||||
assert prom_parse(client) == desired
|
||||
|
||||
@@ -162,6 +149,9 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
|
||||
log.info(f"Used LFC size: {lfc_used_pages}")
|
||||
pg_cur.execute("select * from neon.get_prewarm_info()")
|
||||
total, prewarmed, skipped, _ = pg_cur.fetchall()[0]
|
||||
log.info(f"Prewarm info: {total=} {prewarmed=} {skipped=}")
|
||||
progress = (prewarmed + skipped) * 100 // total
|
||||
log.info(f"Prewarm progress: {progress}%")
|
||||
assert lfc_used_pages > 10000
|
||||
assert total > 0
|
||||
assert prewarmed > 0
|
||||
@@ -171,54 +161,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
|
||||
assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
|
||||
|
||||
desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped}
|
||||
check_prewarmed_contains(method, client, desired)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
|
||||
def test_lfc_prewarm_cancel(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test we can cancel LFC prewarm and prewarm successfully after
|
||||
"""
|
||||
env = neon_simple_env
|
||||
n_records = 1000000
|
||||
cfg = [
|
||||
"autovacuum = off",
|
||||
"shared_buffers=1MB",
|
||||
"neon.max_file_cache_size=1GB",
|
||||
"neon.file_cache_size_limit=1GB",
|
||||
"neon.file_cache_prewarm_limit=1000",
|
||||
]
|
||||
endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg)
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
pg_cur = pg_conn.cursor()
|
||||
pg_cur.execute("create schema neon; create extension neon with schema neon")
|
||||
pg_cur.execute("create database lfc")
|
||||
|
||||
lfc_conn = endpoint.connect(dbname="lfc")
|
||||
lfc_cur = lfc_conn.cursor()
|
||||
log.info(f"Inserting {n_records} rows")
|
||||
lfc_cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))")
|
||||
lfc_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))")
|
||||
log.info(f"Inserted {n_records} rows")
|
||||
|
||||
client = endpoint.http_client()
|
||||
method = PrewarmMethod.COMPUTE_CTL
|
||||
offload_lfc(method, client, pg_cur)
|
||||
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
thread = Thread(target=lambda: prewarm_endpoint(method, client, pg_cur, None))
|
||||
thread.start()
|
||||
# wait 2 seconds to ensure we cancel prewarm SQL query
|
||||
sleep(2)
|
||||
client.cancel_prewarm_lfc()
|
||||
thread.join()
|
||||
assert client.prewarm_lfc_status()["status"] == "cancelled"
|
||||
|
||||
prewarm_endpoint(method, client, pg_cur, None)
|
||||
assert client.prewarm_lfc_status()["status"] == "completed"
|
||||
check_prewarmed(method, client, desired)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
|
||||
@@ -235,8 +178,9 @@ def test_lfc_prewarm_empty(neon_simple_env: NeonEnv):
|
||||
cur = conn.cursor()
|
||||
cur.execute("create schema neon; create extension neon with schema neon")
|
||||
method = PrewarmMethod.COMPUTE_CTL
|
||||
assert offload_lfc(method, client, cur)["status"] == "skipped"
|
||||
assert prewarm_endpoint(method, client, cur, None)["status"] == "skipped"
|
||||
offload_lfc(method, client, cur)
|
||||
prewarm_endpoint(method, client, cur, None)
|
||||
assert client.prewarm_lfc_status()["status"] == "skipped"
|
||||
|
||||
|
||||
# autoprewarm isn't needed as we prewarm manually
|
||||
@@ -307,11 +251,11 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet
|
||||
|
||||
workload_threads = []
|
||||
for _ in range(n_threads):
|
||||
t = Thread(target=workload)
|
||||
t = threading.Thread(target=workload)
|
||||
workload_threads.append(t)
|
||||
t.start()
|
||||
|
||||
prewarm_thread = Thread(target=prewarm)
|
||||
prewarm_thread = threading.Thread(target=prewarm)
|
||||
prewarm_thread.start()
|
||||
|
||||
def prewarmed():
|
||||
|
||||
@@ -286,177 +286,3 @@ def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder):
|
||||
assert re.match(r".*Timeline .* deleted.*", exc.value.response.text)
|
||||
# The timeline should remain deleted.
|
||||
expect_deleted(second_sk)
|
||||
|
||||
|
||||
def test_safekeeper_migration_stale_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that safekeeper migration handles stale timeline correctly by migrating to
|
||||
a safekeeper with a stale timeline.
|
||||
1. Check that we are waiting for the stale timeline to catch up with the commit lsn.
|
||||
The migration might fail if there is no compute to advance the WAL.
|
||||
2. Check that we rely on last_log_term (and not the current term) when waiting for the
|
||||
sync_position on step 7.
|
||||
3. Check that migration succeeds if the compute is running.
|
||||
"""
|
||||
neon_env_builder.num_safekeepers = 2
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
"timeline_safekeeper_count": 1,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
|
||||
env.storage_controller.allowed_errors.append(".*not enough successful .* to reach quorum.*")
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
active_sk = env.get_safekeeper(mconf["sk_set"][0])
|
||||
other_sk = [sk for sk in env.safekeepers if sk.id != active_sk.id][0]
|
||||
|
||||
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
|
||||
ep.start(safekeeper_generation=1, safekeepers=[active_sk.id])
|
||||
ep.safe_psql("CREATE TABLE t(a int)")
|
||||
ep.safe_psql("INSERT INTO t VALUES (0)")
|
||||
|
||||
# Pull the timeline to other_sk, so other_sk now has a "stale" timeline on it.
|
||||
other_sk.pull_timeline([active_sk], env.initial_tenant, env.initial_timeline)
|
||||
|
||||
# Advance the WAL on active_sk.
|
||||
ep.safe_psql("INSERT INTO t VALUES (1)")
|
||||
|
||||
# The test is more tricky if we have the same last_log_term but different term/flush_lsn.
|
||||
# Stop the active_sk during the endpoint shutdown because otherwise compute_ctl runs
|
||||
# sync_safekeepers and advances last_log_term on active_sk.
|
||||
active_sk.stop()
|
||||
ep.stop(mode="immediate")
|
||||
active_sk.start()
|
||||
|
||||
active_sk_status = active_sk.http_client().timeline_status(
|
||||
env.initial_tenant, env.initial_timeline
|
||||
)
|
||||
other_sk_status = other_sk.http_client().timeline_status(
|
||||
env.initial_tenant, env.initial_timeline
|
||||
)
|
||||
|
||||
# other_sk should have the same last_log_term, but a stale flush_lsn.
|
||||
assert active_sk_status.last_log_term == other_sk_status.last_log_term
|
||||
assert active_sk_status.flush_lsn > other_sk_status.flush_lsn
|
||||
|
||||
commit_lsn = active_sk_status.flush_lsn
|
||||
|
||||
# Bump the term on other_sk to make it higher than active_sk.
|
||||
# This is to make sure we don't use current term instead of last_log_term in the algorithm.
|
||||
other_sk.http_client().term_bump(
|
||||
env.initial_tenant, env.initial_timeline, active_sk_status.term + 100
|
||||
)
|
||||
|
||||
# TODO(diko): now it fails because the timeline on other_sk is stale and there is no compute
|
||||
# to catch up it with active_sk. It might be fixed in https://databricks.atlassian.net/browse/LKB-946
|
||||
# if we delete stale timelines before starting the migration.
|
||||
# But the rest of the test is still valid: we should not lose committed WAL after the migration.
|
||||
with pytest.raises(
|
||||
StorageControllerApiException, match="not enough successful .* to reach quorum"
|
||||
):
|
||||
env.storage_controller.migrate_safekeepers(
|
||||
env.initial_tenant, env.initial_timeline, [other_sk.id]
|
||||
)
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert mconf["new_sk_set"] == [other_sk.id]
|
||||
assert mconf["sk_set"] == [active_sk.id]
|
||||
assert mconf["generation"] == 2
|
||||
|
||||
# Start the endpoint, so it advances the WAL on other_sk.
|
||||
ep.start(safekeeper_generation=2, safekeepers=[active_sk.id, other_sk.id])
|
||||
# Now the migration should succeed.
|
||||
env.storage_controller.migrate_safekeepers(
|
||||
env.initial_tenant, env.initial_timeline, [other_sk.id]
|
||||
)
|
||||
|
||||
# Check that we didn't lose committed WAL.
|
||||
assert (
|
||||
other_sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline).flush_lsn
|
||||
>= commit_lsn
|
||||
)
|
||||
assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]
|
||||
|
||||
|
||||
def test_pull_from_most_advanced_sk(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that we pull the timeline from the most advanced safekeeper during the
|
||||
migration and do not lose committed WAL.
|
||||
"""
|
||||
neon_env_builder.num_safekeepers = 4
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
"timeline_safekeeper_count": 3,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
sk_set = mconf["sk_set"]
|
||||
assert len(sk_set) == 3
|
||||
|
||||
other_sk = [sk.id for sk in env.safekeepers if sk.id not in sk_set][0]
|
||||
|
||||
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
|
||||
ep.start(safekeeper_generation=1, safekeepers=sk_set)
|
||||
ep.safe_psql("CREATE TABLE t(a int)")
|
||||
ep.safe_psql("INSERT INTO t VALUES (0)")
|
||||
|
||||
# Stop one sk, so we have a lagging WAL on it.
|
||||
env.get_safekeeper(sk_set[0]).stop()
|
||||
# Advance the WAL on the other sks.
|
||||
ep.safe_psql("INSERT INTO t VALUES (1)")
|
||||
|
||||
# Stop other sks to make sure compute_ctl doesn't advance the last_log_term on them during shutdown.
|
||||
for sk_id in sk_set[1:]:
|
||||
env.get_safekeeper(sk_id).stop()
|
||||
ep.stop(mode="immediate")
|
||||
for sk_id in sk_set:
|
||||
env.get_safekeeper(sk_id).start()
|
||||
|
||||
# Bump the term on the lagging sk to make sure we don't use it to choose the most advanced sk.
|
||||
env.get_safekeeper(sk_set[0]).http_client().term_bump(
|
||||
env.initial_tenant, env.initial_timeline, 100
|
||||
)
|
||||
|
||||
def get_commit_lsn(sk_set: list[int]):
|
||||
flush_lsns = []
|
||||
last_log_terms = []
|
||||
for sk_id in sk_set:
|
||||
sk = env.get_safekeeper(sk_id)
|
||||
status = sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline)
|
||||
flush_lsns.append(status.flush_lsn)
|
||||
last_log_terms.append(status.last_log_term)
|
||||
|
||||
# In this test we assume that all sks have the same last_log_term.
|
||||
assert len(set(last_log_terms)) == 1
|
||||
|
||||
flush_lsns.sort(reverse=True)
|
||||
commit_lsn = flush_lsns[len(sk_set) // 2]
|
||||
|
||||
log.info(f"sk_set: {sk_set}, flush_lsns: {flush_lsns}, commit_lsn: {commit_lsn}")
|
||||
return commit_lsn
|
||||
|
||||
commit_lsn_before_migration = get_commit_lsn(sk_set)
|
||||
|
||||
# Make two migrations, so the lagging sk stays in the sk_set, but other sks are replaced.
|
||||
new_sk_set1 = [sk_set[0], sk_set[1], other_sk] # remove sk_set[2], add other_sk
|
||||
new_sk_set2 = [sk_set[0], other_sk, sk_set[2]] # remove sk_set[1], add sk_set[2] back
|
||||
env.storage_controller.migrate_safekeepers(
|
||||
env.initial_tenant, env.initial_timeline, new_sk_set1
|
||||
)
|
||||
env.storage_controller.migrate_safekeepers(
|
||||
env.initial_tenant, env.initial_timeline, new_sk_set2
|
||||
)
|
||||
|
||||
commit_lsn_after_migration = get_commit_lsn(new_sk_set2)
|
||||
|
||||
# We should not lose committed WAL.
|
||||
# If we have choosen the lagging sk to pull the timeline from, this might fail.
|
||||
assert commit_lsn_before_migration <= commit_lsn_after_migration
|
||||
|
||||
ep.start(safekeeper_generation=5, safekeepers=new_sk_set2)
|
||||
assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]
|
||||
|
||||
@@ -19,7 +19,7 @@ def test_pageserver_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
addr = f"https://localhost:{env.pageserver.service_port.https}/v1/status"
|
||||
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
|
||||
|
||||
|
||||
def test_safekeeper_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
@@ -37,7 +37,7 @@ def test_safekeeper_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# 1. Make simple https request.
|
||||
addr = f"https://localhost:{sk.port.https}/v1/status"
|
||||
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
|
||||
|
||||
# Note: http_port is intentionally wrong.
|
||||
# Storcon should not use it if use_https is on.
|
||||
@@ -83,7 +83,7 @@ def test_storage_controller_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
addr = f"https://localhost:{env.storage_controller.port}/status"
|
||||
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
|
||||
|
||||
|
||||
def test_certificate_rotation(neon_env_builder: NeonEnvBuilder):
|
||||
@@ -111,7 +111,7 @@ def test_certificate_rotation(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# 1. Check if https works.
|
||||
addr = f"https://localhost:{port}/v1/status"
|
||||
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
|
||||
|
||||
ps_cert_path = env.pageserver.workdir / "server.crt"
|
||||
ps_key_path = env.pageserver.workdir / "server.key"
|
||||
@@ -136,7 +136,7 @@ def test_certificate_rotation(neon_env_builder: NeonEnvBuilder):
|
||||
wait_until(error_reloading_cert)
|
||||
|
||||
# 4. Check that it uses old cert.
|
||||
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
|
||||
cur_cert = ssl.get_server_certificate(("localhost", port))
|
||||
assert cur_cert == ps_cert
|
||||
|
||||
@@ -150,7 +150,7 @@ def test_certificate_rotation(neon_env_builder: NeonEnvBuilder):
|
||||
wait_until(cert_reloaded)
|
||||
|
||||
# 6. Check that server returns new cert.
|
||||
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
|
||||
cur_cert = ssl.get_server_certificate(("localhost", port))
|
||||
assert cur_cert == sk_cert
|
||||
|
||||
@@ -174,7 +174,7 @@ def test_server_and_cert_metrics(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
|
||||
addr = f"https://localhost:{env.pageserver.service_port.https}/v1/status"
|
||||
requests.get(addr, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
requests.get(addr, verify=str(env.tls_ca_file)).raise_for_status()
|
||||
|
||||
new_https_conn_count = (
|
||||
ps_client.get_metric_value("http_server_connection_started_total", filter_https) or 0
|
||||
@@ -227,10 +227,27 @@ def test_storage_broker_https_api(neon_env_builder: NeonEnvBuilder):
|
||||
# 1. Simple check that HTTPS is enabled and works.
|
||||
url = env.broker.client_url() + "/status"
|
||||
assert url.startswith("https://")
|
||||
requests.get(url, verify=str(env.ssl_ca_file)).raise_for_status()
|
||||
requests.get(url, verify=str(env.tls_ca_file)).raise_for_status()
|
||||
|
||||
# 2. Simple workload to check that SK -> broker -> PS communication works over HTTPS.
|
||||
workload = Workload(env, env.initial_tenant, env.initial_timeline)
|
||||
workload.init()
|
||||
workload.write_rows(10)
|
||||
workload.validate()
|
||||
|
||||
|
||||
def test_compute_tls(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
neon_env_builder.use_compute_tls = True
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.create_branch("test_compute_tls")
|
||||
|
||||
with env.endpoints.create_start("test_compute_tls") as endpoint:
|
||||
res = endpoint.safe_psql(
|
||||
"select ssl from pg_stat_ssl where pid = pg_backend_pid();",
|
||||
sslmode="verify-full",
|
||||
sslrootcert=env.tls_ca_file,
|
||||
)
|
||||
assert res == [(True,)]
|
||||
|
||||
@@ -2757,37 +2757,18 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
|
||||
remote_storage_kind = s3_storage()
|
||||
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
|
||||
|
||||
# Set a very small disk usage limit (1KB)
|
||||
neon_env_builder.safekeeper_extra_opts = ["--max-timeline-disk-usage-bytes=1024"]
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Create a timeline and endpoint
|
||||
env.create_branch("test_timeline_disk_usage_limit")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_timeline_disk_usage_limit",
|
||||
config_lines=[
|
||||
"neon.lakebase_mode=true",
|
||||
],
|
||||
)
|
||||
|
||||
# Install the neon extension in the test database. We need it to query perf counter metrics.
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE EXTENSION IF NOT EXISTS neon")
|
||||
# Sanity-check safekeeper connection status in neon_perf_counters in the happy case.
|
||||
cur.execute(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'"
|
||||
)
|
||||
assert cur.fetchone() == (1,), "Expected 1 active safekeeper"
|
||||
cur.execute(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
|
||||
)
|
||||
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
|
||||
endpoint = env.endpoints.create_start("test_timeline_disk_usage_limit")
|
||||
|
||||
# Get the safekeeper
|
||||
sk = env.safekeepers[0]
|
||||
|
||||
# Restart the safekeeper with a very small disk usage limit (1KB)
|
||||
sk.stop().start(["--max-timeline-disk-usage-bytes=1024"])
|
||||
|
||||
# Inject a failpoint to stop WAL backup
|
||||
with sk.http_client() as http_cli:
|
||||
http_cli.configure_failpoints([("backup-lsn-range-pausable", "pause")])
|
||||
@@ -2813,18 +2794,6 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
|
||||
wait_until(error_logged)
|
||||
log.info("Found expected error message in compute log, resuming.")
|
||||
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Confirm that neon_perf_counters also indicates that there are no active safekeepers
|
||||
cur.execute(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'"
|
||||
)
|
||||
assert cur.fetchone() == (0,), "Expected 0 active safekeepers"
|
||||
cur.execute(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
|
||||
)
|
||||
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
|
||||
|
||||
# Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we
|
||||
# implemented didn't work as expected.
|
||||
time.sleep(2)
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 2155cb165d...c9f9fdd011
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 2aaab3bb4a...aaaeff2550
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user