Compare commits

...

2 Commits

Author SHA1 Message Date
Mikhail Kot
d6474ad0ed use parsed spec safekeepers from GUC 2025-07-31 21:21:10 +01:00
Mikhail Kot
0c5250b292 promotion fixes 2025-07-31 15:49:55 +01:00
2 changed files with 21 additions and 12 deletions

View File

@@ -1,6 +1,7 @@
use crate::compute::ComputeNode;
use anyhow::{Context, bail};
use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
use std::sync::Arc;
use std::time::Instant;
use tracing::info;
@@ -9,7 +10,7 @@ impl ComputeNode {
/// disconnects, this does not stop promotion, and subsequent calls block until promote finishes.
/// Called by control plane on secondary after primary endpoint is terminated
/// Has a failpoint "compute-promotion"
pub async fn promote(self: &std::sync::Arc<Self>, cfg: PromoteConfig) -> PromoteState {
pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
let this = self.clone();
let promote_fn = async move || match this.promote_impl(cfg).await {
Ok(state) => state,
@@ -38,7 +39,14 @@ impl ComputeNode {
task.borrow().clone()
}
async fn promote_impl(&self, cfg: PromoteConfig) -> anyhow::Result<PromoteState> {
async fn promote_impl(self: &Arc<Self>, cfg: PromoteConfig) -> anyhow::Result<PromoteState> {
#[allow(unused_mut)]
let mut new_pspec = crate::compute::ParsedSpec::try_from(cfg.spec).expect("invalid spec");
let safekeepers_str = new_pspec.safekeeper_connstrings.join(",");
if safekeepers_str.is_empty() {
bail!("empty safekeepers list");
}
{
let state = self.state.lock().unwrap();
let mode = &state.pspec.as_ref().unwrap().spec.mode;
@@ -83,11 +91,8 @@ impl ComputeNode {
let lsn_wait_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
// using $1 doesn't work with ALTER SYSTEM SET
let safekeepers_sql = format!(
"ALTER SYSTEM SET neon.safekeepers='{}'",
cfg.spec.safekeeper_connstrings.join(",")
);
// $1 doesn't work with ALTER SYSTEM SET
let safekeepers_sql = format!("ALTER SYSTEM SET neon.safekeepers='{safekeepers_str}'");
client
.query(&safekeepers_sql, &[])
.await
@@ -128,8 +133,6 @@ impl ComputeNode {
}
// Already checked validity in http handler
#[allow(unused_mut)]
let mut new_pspec = crate::compute::ParsedSpec::try_from(cfg.spec).expect("invalid spec");
{
let mut state = self.state.lock().unwrap();
@@ -161,7 +164,10 @@ impl ComputeNode {
}
info!("applied new spec, reconfiguring as primary");
self.reconfigure()?;
// reconfigure calls apply_spec_sql which blocks on a current runtime. To avoid panicking
// due to nested runtimes, wait on this task in a blocking way
let this = self.clone();
tokio::task::spawn_blocking(move || this.reconfigure()).await??;
let reconfigure_time_ms = now.elapsed().as_millis() as u32;
Ok(PromoteState::Completed {

View File

@@ -184,11 +184,14 @@ def test_replica_promote_handler_disconnects(neon_simple_env: NeonEnv):
once, and no error is thrown
"""
env: NeonEnv = neon_simple_env
primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
primary: Endpoint = env.endpoints.create(branch_name="main", endpoint_id="primary")
# Also test catalog updates don't trigger any issues
primary.respec(skip_pg_catalog_updates=False)
primary.start()
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
with primary.connect() as conn, conn.cursor() as cur:
cur.execute("create schema neon;create extension neon with schema neon")
cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)")
cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")