Report timespans for promotion and prewarm (#12730)

- Return sub-actions time spans for prewarm, prewarm offload, and
promotion in http handlers.
- Set `synchronous_standby_names=walproposer` for promoted endpoints.
Otherwise, walproposer on promoted standby ignores reply from safekeeper
and is stuck on lsn COMMIT eternally.
This commit is contained in:
Mikhail
2025-07-31 12:51:19 +01:00
committed by GitHub
parent b4a63e0a34
commit df4e37b7cc
8 changed files with 209 additions and 149 deletions

View File

@@ -2780,7 +2780,7 @@ LIMIT 100",
// 4. We start again and try to prewarm with the state from 2. instead of the previous complete state // 4. We start again and try to prewarm with the state from 2. instead of the previous complete state
if matches!( if matches!(
prewarm_state, prewarm_state,
LfcPrewarmState::Completed LfcPrewarmState::Completed { .. }
| LfcPrewarmState::NotPrewarmed | LfcPrewarmState::NotPrewarmed
| LfcPrewarmState::Skipped | LfcPrewarmState::Skipped
) { ) {

View File

@@ -7,19 +7,11 @@ use http::StatusCode;
use reqwest::Client; use reqwest::Client;
use std::mem::replace; use std::mem::replace;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use tokio::{io::AsyncReadExt, select, spawn}; use tokio::{io::AsyncReadExt, select, spawn};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{error, info}; use tracing::{error, info};
#[derive(serde::Serialize, Default)]
pub struct LfcPrewarmStateWithProgress {
#[serde(flatten)]
base: LfcPrewarmState,
total: i32,
prewarmed: i32,
skipped: i32,
}
/// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks /// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
struct EndpointStoragePair { struct EndpointStoragePair {
url: String, url: String,
@@ -28,7 +20,7 @@ struct EndpointStoragePair {
const KEY: &str = "lfc_state"; const KEY: &str = "lfc_state";
impl EndpointStoragePair { impl EndpointStoragePair {
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion /// endpoint_id is set to None while prewarming from other endpoint, see compute_promote.rs
/// If not None, takes precedence over pspec.spec.endpoint_id /// If not None, takes precedence over pspec.spec.endpoint_id
fn from_spec_and_endpoint( fn from_spec_and_endpoint(
pspec: &crate::compute::ParsedSpec, pspec: &crate::compute::ParsedSpec,
@@ -54,36 +46,8 @@ impl EndpointStoragePair {
} }
impl ComputeNode { impl ComputeNode {
// If prewarm failed, we want to get overall number of segments as well as done ones. pub async fn lfc_prewarm_state(&self) -> LfcPrewarmState {
// However, this function should be reliable even if querying postgres failed. self.state.lock().unwrap().lfc_prewarm_state.clone()
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
info!("requesting LFC prewarm state from postgres");
let mut state = LfcPrewarmStateWithProgress::default();
{
state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
}
let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
Ok(client) => client,
Err(err) => {
error!(%err, "connecting to postgres");
return state;
}
};
let row = match client
.query_one("select * from neon.get_prewarm_info()", &[])
.await
{
Ok(row) => row,
Err(err) => {
error!(%err, "querying LFC prewarm status");
return state;
}
};
state.total = row.try_get(0).unwrap_or_default();
state.prewarmed = row.try_get(1).unwrap_or_default();
state.skipped = row.try_get(2).unwrap_or_default();
state
} }
pub fn lfc_offload_state(&self) -> LfcOffloadState { pub fn lfc_offload_state(&self) -> LfcOffloadState {
@@ -133,7 +97,6 @@ impl ComputeNode {
} }
/// Request LFC state from endpoint storage and load corresponding pages into Postgres. /// Request LFC state from endpoint storage and load corresponding pages into Postgres.
/// Returns a result with `false` if the LFC state is not found in endpoint storage.
async fn prewarm_impl( async fn prewarm_impl(
&self, &self,
from_endpoint: Option<String>, from_endpoint: Option<String>,
@@ -148,6 +111,7 @@ impl ComputeNode {
fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint")); fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
info!(%url, "requesting LFC state from endpoint storage"); info!(%url, "requesting LFC state from endpoint storage");
let mut now = Instant::now();
let request = Client::new().get(&url).bearer_auth(storage_token); let request = Client::new().get(&url).bearer_auth(storage_token);
let response = select! { let response = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled), _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
@@ -160,6 +124,8 @@ impl ComputeNode {
StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped), StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
status => bail!("{status} querying endpoint storage"), status => bail!("{status} querying endpoint storage"),
} }
let state_download_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let mut uncompressed = Vec::new(); let mut uncompressed = Vec::new();
let lfc_state = select! { let lfc_state = select! {
@@ -174,6 +140,8 @@ impl ComputeNode {
read = decoder.read_to_end(&mut uncompressed) => read read = decoder.read_to_end(&mut uncompressed) => read
} }
.context("decoding LFC state")?; .context("decoding LFC state")?;
let uncompress_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let uncompressed_len = uncompressed.len(); let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}"); info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
@@ -196,15 +164,34 @@ impl ComputeNode {
} }
.context("loading LFC state into postgres") .context("loading LFC state into postgres")
.map(|_| ())?; .map(|_| ())?;
let prewarm_time_ms = now.elapsed().as_millis() as u32;
Ok(LfcPrewarmState::Completed) let row = client
.query_one("select * from neon.get_prewarm_info()", &[])
.await
.context("querying prewarm info")?;
let total = row.try_get(0).unwrap_or_default();
let prewarmed = row.try_get(1).unwrap_or_default();
let skipped = row.try_get(2).unwrap_or_default();
Ok(LfcPrewarmState::Completed {
total,
prewarmed,
skipped,
state_download_time_ms,
uncompress_time_ms,
prewarm_time_ms,
})
} }
/// If offload request is ongoing, return false, true otherwise /// If offload request is ongoing, return false, true otherwise
pub fn offload_lfc(self: &Arc<Self>) -> bool { pub fn offload_lfc(self: &Arc<Self>) -> bool {
{ {
let state = &mut self.state.lock().unwrap().lfc_offload_state; let state = &mut self.state.lock().unwrap().lfc_offload_state;
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading { if matches!(
replace(state, LfcOffloadState::Offloading),
LfcOffloadState::Offloading
) {
return false; return false;
} }
} }
@@ -216,7 +203,10 @@ impl ComputeNode {
pub async fn offload_lfc_async(self: &Arc<Self>) { pub async fn offload_lfc_async(self: &Arc<Self>) {
{ {
let state = &mut self.state.lock().unwrap().lfc_offload_state; let state = &mut self.state.lock().unwrap().lfc_offload_state;
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading { if matches!(
replace(state, LfcOffloadState::Offloading),
LfcOffloadState::Offloading
) {
return; return;
} }
} }
@@ -234,7 +224,6 @@ impl ComputeNode {
LfcOffloadState::Failed { error } LfcOffloadState::Failed { error }
} }
}; };
self.state.lock().unwrap().lfc_offload_state = state; self.state.lock().unwrap().lfc_offload_state = state;
} }
@@ -242,6 +231,7 @@ impl ComputeNode {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?; let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from Postgres"); info!(%url, "requesting LFC state from Postgres");
let mut now = Instant::now();
let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf) let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await .await
.context("connecting to postgres")? .context("connecting to postgres")?
@@ -255,25 +245,36 @@ impl ComputeNode {
info!(%url, "empty LFC state, not exporting"); info!(%url, "empty LFC state, not exporting");
return Ok(LfcOffloadState::Skipped); return Ok(LfcOffloadState::Skipped);
}; };
let state_query_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let mut compressed = Vec::new(); let mut compressed = Vec::new();
ZstdEncoder::new(state) ZstdEncoder::new(state)
.read_to_end(&mut compressed) .read_to_end(&mut compressed)
.await .await
.context("compressing LFC state")?; .context("compressing LFC state")?;
let compress_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let compressed_len = compressed.len(); let compressed_len = compressed.len();
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage"); info!(%url, "downloaded LFC state, compressed size {compressed_len}");
let request = Client::new().put(url).bearer_auth(token).body(compressed); let request = Client::new().put(url).bearer_auth(token).body(compressed);
match request.send().await { let response = request
Ok(res) if res.status() == StatusCode::OK => Ok(LfcOffloadState::Completed), .send()
Ok(res) => bail!( .await
"Request to endpoint storage failed with status: {}", .context("writing to endpoint storage")?;
res.status() let state_upload_time_ms = now.elapsed().as_millis() as u32;
), let status = response.status();
Err(err) => Err(err).context("writing to endpoint storage"), if status != StatusCode::OK {
bail!("request to endpoint storage failed: {status}");
} }
Ok(LfcOffloadState::Completed {
compress_time_ms,
state_query_time_ms,
state_upload_time_ms,
})
} }
pub fn cancel_prewarm(self: &Arc<Self>) { pub fn cancel_prewarm(self: &Arc<Self>) {

View File

@@ -1,32 +1,24 @@
use crate::compute::ComputeNode; use crate::compute::ComputeNode;
use anyhow::{Context, Result, bail}; use anyhow::{Context, bail};
use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState}; use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
use compute_api::spec::ComputeMode; use std::time::Instant;
use itertools::Itertools;
use std::collections::HashMap;
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use tracing::info; use tracing::info;
use utils::lsn::Lsn;
impl ComputeNode { impl ComputeNode {
/// Returns only when promote fails or succeeds. If a network error occurs /// Returns only when promote fails or succeeds. If http client calling this function
/// and http client disconnects, this does not stop promotion, and subsequent /// disconnects, this does not stop promotion, and subsequent calls block until promote finishes.
/// calls block until promote finishes.
/// Called by control plane on secondary after primary endpoint is terminated /// Called by control plane on secondary after primary endpoint is terminated
/// Has a failpoint "compute-promotion" /// Has a failpoint "compute-promotion"
pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState { pub async fn promote(self: &std::sync::Arc<Self>, cfg: PromoteConfig) -> PromoteState {
let cloned = self.clone(); let this = self.clone();
let promote_fn = async move || { let promote_fn = async move || match this.promote_impl(cfg).await {
let Err(err) = cloned.promote_impl(cfg).await else { Ok(state) => state,
return PromoteState::Completed; Err(err) => {
}; tracing::error!(%err, "promoting replica");
tracing::error!(%err, "promoting"); let error = format!("{err:#}");
PromoteState::Failed { PromoteState::Failed { error }
error: format!("{err:#}"),
} }
}; };
let start_promotion = || { let start_promotion = || {
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted); let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
tokio::spawn(async move { tx.send(promote_fn().await) }); tokio::spawn(async move { tx.send(promote_fn().await) });
@@ -34,36 +26,31 @@ impl ComputeNode {
}; };
let mut task; let mut task;
// self.state is unlocked after block ends so we lock it in promote_impl // promote_impl locks self.state so we need to unlock it before calling task.changed()
// and task.changed() is reached
{ {
task = self let promote_state = &mut self.state.lock().unwrap().promote_state;
.state task = promote_state.get_or_insert_with(start_promotion).clone()
.lock() }
.unwrap() if task.changed().await.is_err() {
.promote_state let error = "promote sender dropped".to_string();
.get_or_insert_with(start_promotion) return PromoteState::Failed { error };
.clone()
} }
task.changed().await.expect("promote sender dropped");
task.borrow().clone() task.borrow().clone()
} }
async fn promote_impl(&self, mut cfg: PromoteConfig) -> Result<()> { async fn promote_impl(&self, cfg: PromoteConfig) -> anyhow::Result<PromoteState> {
{ {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let mode = &state.pspec.as_ref().unwrap().spec.mode; let mode = &state.pspec.as_ref().unwrap().spec.mode;
if *mode != ComputeMode::Replica { if *mode != compute_api::spec::ComputeMode::Replica {
bail!("{} is not replica", mode.to_type_str()); bail!("compute mode \"{}\" is not replica", mode.to_type_str());
} }
// we don't need to query Postgres so not self.lfc_prewarm_state()
match &state.lfc_prewarm_state { match &state.lfc_prewarm_state {
LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => { status @ (LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming) => {
bail!("prewarm not requested or pending") bail!("compute {status}")
} }
LfcPrewarmState::Failed { error } => { LfcPrewarmState::Failed { error } => {
tracing::warn!(%error, "replica prewarm failed") tracing::warn!(%error, "compute prewarm failed")
} }
_ => {} _ => {}
} }
@@ -72,9 +59,10 @@ impl ComputeNode {
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf) let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await .await
.context("connecting to postgres")?; .context("connecting to postgres")?;
let mut now = Instant::now();
let primary_lsn = cfg.wal_flush_lsn; let primary_lsn = cfg.wal_flush_lsn;
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID; let mut standby_lsn = utils::lsn::Lsn::INVALID;
const RETRIES: i32 = 20; const RETRIES: i32 = 20;
for i in 0..=RETRIES { for i in 0..=RETRIES {
let row = client let row = client
@@ -82,16 +70,18 @@ impl ComputeNode {
.await .await
.context("getting last replay lsn")?; .context("getting last replay lsn")?;
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into(); let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
last_wal_replay_lsn = lsn.into(); standby_lsn = lsn.into();
if last_wal_replay_lsn >= primary_lsn { if standby_lsn >= primary_lsn {
break; break;
} }
info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}"); info!(%standby_lsn, %primary_lsn, "catching up, try {i}");
sleep(Duration::from_secs(1)).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await;
} }
if last_wal_replay_lsn < primary_lsn { if standby_lsn < primary_lsn {
bail!("didn't catch up with primary in {RETRIES} retries"); bail!("didn't catch up with primary in {RETRIES} retries");
} }
let lsn_wait_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
// using $1 doesn't work with ALTER SYSTEM SET // using $1 doesn't work with ALTER SYSTEM SET
let safekeepers_sql = format!( let safekeepers_sql = format!(
@@ -102,27 +92,33 @@ impl ComputeNode {
.query(&safekeepers_sql, &[]) .query(&safekeepers_sql, &[])
.await .await
.context("setting safekeepers")?; .context("setting safekeepers")?;
client
.query(
"ALTER SYSTEM SET synchronous_standby_names=walproposer",
&[],
)
.await
.context("setting synchronous_standby_names")?;
client client
.query("SELECT pg_catalog.pg_reload_conf()", &[]) .query("SELECT pg_catalog.pg_reload_conf()", &[])
.await .await
.context("reloading postgres config")?; .context("reloading postgres config")?;
#[cfg(feature = "testing")] #[cfg(feature = "testing")]
fail::fail_point!("compute-promotion", |_| { fail::fail_point!("compute-promotion", |_| bail!(
bail!("promotion configured to fail because of a failpoint") "compute-promotion failpoint"
}); ));
let row = client let row = client
.query_one("SELECT * FROM pg_catalog.pg_promote()", &[]) .query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
.await .await
.context("pg_promote")?; .context("pg_promote")?;
if !row.get::<usize, bool>(0) { if !row.get::<usize, bool>(0) {
bail!("pg_promote() returned false"); bail!("pg_promote() failed");
} }
let pg_promote_time_ms = now.elapsed().as_millis() as u32;
let now = Instant::now();
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let row = client let row = client
.query_one("SHOW transaction_read_only", &[]) .query_one("SHOW transaction_read_only", &[])
.await .await
@@ -131,36 +127,47 @@ impl ComputeNode {
bail!("replica in read only mode after promotion"); bail!("replica in read only mode after promotion");
} }
// Already checked validity in http handler
#[allow(unused_mut)]
let mut new_pspec = crate::compute::ParsedSpec::try_from(cfg.spec).expect("invalid spec");
{ {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let spec = &mut state.pspec.as_mut().unwrap().spec;
spec.mode = ComputeMode::Primary; // Local setup has different ports for pg process (port=) for primary and secondary.
let new_conf = cfg.spec.cluster.postgresql_conf.as_mut().unwrap(); // Primary is stopped so we need secondary's "port" value
let existing_conf = spec.cluster.postgresql_conf.as_ref().unwrap(); #[cfg(feature = "testing")]
Self::merge_spec(new_conf, existing_conf); {
let old_spec = &state.pspec.as_ref().unwrap().spec;
let Some(old_conf) = old_spec.cluster.postgresql_conf.as_ref() else {
bail!("pspec.spec.cluster.postgresql_conf missing for endpoint");
};
let set: std::collections::HashMap<&str, &str> = old_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
let Some(new_conf) = new_pspec.spec.cluster.postgresql_conf.as_mut() else {
bail!("pspec.spec.cluster.postgresql_conf missing for supplied config");
};
new_conf.push_str(&format!("port={}\n", set["port"]));
}
tracing::debug!("applied spec: {:#?}", new_pspec.spec);
if self.params.lakebase_mode {
ComputeNode::set_spec(&self.params, &mut state, new_pspec);
} else {
state.pspec = Some(new_pspec);
}
} }
info!("applied new spec, reconfiguring as primary"); info!("applied new spec, reconfiguring as primary");
self.reconfigure() self.reconfigure()?;
} let reconfigure_time_ms = now.elapsed().as_millis() as u32;
/// Merge old and new Postgres conf specs to apply on secondary. Ok(PromoteState::Completed {
/// Change new spec's port and safekeepers since they are supplied lsn_wait_time_ms,
/// differenly pg_promote_time_ms,
fn merge_spec(new_conf: &mut String, existing_conf: &str) { reconfigure_time_ms,
let mut new_conf_set: HashMap<&str, &str> = new_conf })
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.remove("neon.safekeepers");
let existing_conf_set: HashMap<&str, &str> = existing_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.insert("port", existing_conf_set["port"]);
*new_conf = new_conf_set
.iter()
.map(|(k, v)| format!("{k}={v}"))
.join("\n");
} }
} }

View File

@@ -617,9 +617,6 @@ components:
type: object type: object
required: required:
- status - status
- total
- prewarmed
- skipped
properties: properties:
status: status:
description: LFC prewarm status description: LFC prewarm status
@@ -637,6 +634,15 @@ components:
skipped: skipped:
description: Pages processed but not prewarmed description: Pages processed but not prewarmed
type: integer type: integer
state_download_time_ms:
description: Time it takes to download LFC state to compute
type: integer
uncompress_time_ms:
description: Time it takes to uncompress LFC state
type: integer
prewarm_time_ms:
description: Time it takes to prewarm LFC state in Postgres
type: integer
LfcOffloadState: LfcOffloadState:
type: object type: object
@@ -650,6 +656,16 @@ components:
error: error:
description: LFC offload error, if any description: LFC offload error, if any
type: string type: string
state_query_time_ms:
description: Time it takes to get LFC state from Postgres
type: integer
compress_time_ms:
description: Time it takes to compress LFC state
type: integer
state_upload_time_ms:
description: Time it takes to upload LFC state to endpoint storage
type: integer
PromoteState: PromoteState:
type: object type: object
@@ -663,6 +679,15 @@ components:
error: error:
description: Promote error, if any description: Promote error, if any
type: string type: string
lsn_wait_time_ms:
description: Time it takes for secondary to catch up with primary WAL flush LSN
type: integer
pg_promote_time_ms:
description: Time it takes to call pg_promote on secondary
type: integer
reconfigure_time_ms:
description: Time it takes to reconfigure promoted secondary
type: integer
SetRoleGrantsRequest: SetRoleGrantsRequest:
type: object type: object

View File

@@ -1,12 +1,11 @@
use crate::compute_prewarm::LfcPrewarmStateWithProgress;
use crate::http::JsonResponse; use crate::http::JsonResponse;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use axum::{Json, http::StatusCode}; use axum::{Json, http::StatusCode};
use axum_extra::extract::OptionalQuery; use axum_extra::extract::OptionalQuery;
use compute_api::responses::LfcOffloadState; use compute_api::responses::{LfcOffloadState, LfcPrewarmState};
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>; type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmStateWithProgress> { pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmState> {
Json(compute.lfc_prewarm_state().await) Json(compute.lfc_prewarm_state().await)
} }

View File

@@ -1,11 +1,22 @@
use crate::http::JsonResponse; use crate::http::JsonResponse;
use axum::extract::Json; use axum::extract::Json;
use compute_api::responses::PromoteConfig;
use http::StatusCode; use http::StatusCode;
pub(in crate::http) async fn promote( pub(in crate::http) async fn promote(
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>, compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
Json(cfg): Json<compute_api::responses::PromoteConfig>, Json(cfg): Json<PromoteConfig>,
) -> axum::response::Response { ) -> axum::response::Response {
// Return early at the cost of extra parsing spec
let pspec = match crate::compute::ParsedSpec::try_from(cfg.spec) {
Ok(p) => p,
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
};
let cfg = PromoteConfig {
spec: pspec.spec,
wal_flush_lsn: cfg.wal_flush_lsn,
};
let state = compute.promote(cfg).await; let state = compute.promote(cfg).await;
if let compute_api::responses::PromoteState::Failed { error: _ } = state { if let compute_api::responses::PromoteState::Failed { error: _ } = state {
return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state); return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state);

View File

@@ -1,10 +1,9 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API. //! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use std::fmt::Display;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use jsonwebtoken::jwk::JwkSet; use jsonwebtoken::jwk::JwkSet;
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
use std::fmt::Display;
use crate::privilege::Privilege; use crate::privilege::Privilege;
use crate::spec::{ComputeSpec, Database, ExtVersion, PgIdent, Role}; use crate::spec::{ComputeSpec, Database, ExtVersion, PgIdent, Role};
@@ -49,7 +48,7 @@ pub struct ExtensionInstallResponse {
/// Status of the LFC prewarm process. The same state machine is reused for /// Status of the LFC prewarm process. The same state machine is reused for
/// both autoprewarm (prewarm after compute/Postgres start using the previously /// both autoprewarm (prewarm after compute/Postgres start using the previously
/// stored LFC state) and explicit prewarming via API. /// stored LFC state) and explicit prewarming via API.
#[derive(Serialize, Default, Debug, Clone, PartialEq)] #[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")] #[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcPrewarmState { pub enum LfcPrewarmState {
/// Default value when compute boots up. /// Default value when compute boots up.
@@ -59,7 +58,14 @@ pub enum LfcPrewarmState {
Prewarming, Prewarming,
/// We found requested LFC state in the endpoint storage and /// We found requested LFC state in the endpoint storage and
/// completed prewarming successfully. /// completed prewarming successfully.
Completed, Completed {
total: i32,
prewarmed: i32,
skipped: i32,
state_download_time_ms: u32,
uncompress_time_ms: u32,
prewarm_time_ms: u32,
},
/// Unexpected error happened during prewarming. Note, `Not Found 404` /// Unexpected error happened during prewarming. Note, `Not Found 404`
/// response from the endpoint storage is explicitly excluded here /// response from the endpoint storage is explicitly excluded here
/// because it can normally happen on the first compute start, /// because it can normally happen on the first compute start,
@@ -84,7 +90,7 @@ impl Display for LfcPrewarmState {
match self { match self {
LfcPrewarmState::NotPrewarmed => f.write_str("NotPrewarmed"), LfcPrewarmState::NotPrewarmed => f.write_str("NotPrewarmed"),
LfcPrewarmState::Prewarming => f.write_str("Prewarming"), LfcPrewarmState::Prewarming => f.write_str("Prewarming"),
LfcPrewarmState::Completed => f.write_str("Completed"), LfcPrewarmState::Completed { .. } => f.write_str("Completed"),
LfcPrewarmState::Skipped => f.write_str("Skipped"), LfcPrewarmState::Skipped => f.write_str("Skipped"),
LfcPrewarmState::Failed { error } => write!(f, "Error({error})"), LfcPrewarmState::Failed { error } => write!(f, "Error({error})"),
LfcPrewarmState::Cancelled => f.write_str("Cancelled"), LfcPrewarmState::Cancelled => f.write_str("Cancelled"),
@@ -92,26 +98,36 @@ impl Display for LfcPrewarmState {
} }
} }
#[derive(Serialize, Default, Debug, Clone, PartialEq)] #[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")] #[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcOffloadState { pub enum LfcOffloadState {
#[default] #[default]
NotOffloaded, NotOffloaded,
Offloading, Offloading,
Completed, Completed {
state_query_time_ms: u32,
compress_time_ms: u32,
state_upload_time_ms: u32,
},
Failed { Failed {
error: String, error: String,
}, },
/// LFC state was empty so it wasn't offloaded
Skipped, Skipped,
} }
#[derive(Serialize, Debug, Clone, PartialEq)] #[derive(Serialize, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")] #[serde(tag = "status", rename_all = "snake_case")]
/// Response of /promote
pub enum PromoteState { pub enum PromoteState {
NotPromoted, NotPromoted,
Completed, Completed {
Failed { error: String }, lsn_wait_time_ms: u32,
pg_promote_time_ms: u32,
reconfigure_time_ms: u32,
},
Failed {
error: String,
},
} }
#[derive(Deserialize, Default, Debug)] #[derive(Deserialize, Default, Debug)]

View File

@@ -145,6 +145,7 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
stop_and_check_lsn(secondary, None) stop_and_check_lsn(secondary, None)
if method == PromoteMethod.COMPUTE_CTL: if method == PromoteMethod.COMPUTE_CTL:
log.info("Restarting primary to check new config")
secondary.stop() secondary.stop()
# In production, compute ultimately receives new compute spec from cplane. # In production, compute ultimately receives new compute spec from cplane.
secondary.respec(mode="Primary") secondary.respec(mode="Primary")