mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 21:10:38 +00:00
Compare commits
3 Commits
quantumish
...
alexk/fix-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
70a273081b | ||
|
|
312a74f11f | ||
|
|
df4e37b7cc |
@@ -2780,7 +2780,7 @@ LIMIT 100",
|
||||
// 4. We start again and try to prewarm with the state from 2. instead of the previous complete state
|
||||
if matches!(
|
||||
prewarm_state,
|
||||
LfcPrewarmState::Completed
|
||||
LfcPrewarmState::Completed { .. }
|
||||
| LfcPrewarmState::NotPrewarmed
|
||||
| LfcPrewarmState::Skipped
|
||||
) {
|
||||
|
||||
@@ -7,19 +7,11 @@ use http::StatusCode;
|
||||
use reqwest::Client;
|
||||
use std::mem::replace;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::{io::AsyncReadExt, select, spawn};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info};
|
||||
|
||||
#[derive(serde::Serialize, Default)]
|
||||
pub struct LfcPrewarmStateWithProgress {
|
||||
#[serde(flatten)]
|
||||
base: LfcPrewarmState,
|
||||
total: i32,
|
||||
prewarmed: i32,
|
||||
skipped: i32,
|
||||
}
|
||||
|
||||
/// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
|
||||
struct EndpointStoragePair {
|
||||
url: String,
|
||||
@@ -28,7 +20,7 @@ struct EndpointStoragePair {
|
||||
|
||||
const KEY: &str = "lfc_state";
|
||||
impl EndpointStoragePair {
|
||||
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
|
||||
/// endpoint_id is set to None while prewarming from other endpoint, see compute_promote.rs
|
||||
/// If not None, takes precedence over pspec.spec.endpoint_id
|
||||
fn from_spec_and_endpoint(
|
||||
pspec: &crate::compute::ParsedSpec,
|
||||
@@ -54,36 +46,8 @@ impl EndpointStoragePair {
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
// If prewarm failed, we want to get overall number of segments as well as done ones.
|
||||
// However, this function should be reliable even if querying postgres failed.
|
||||
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
|
||||
info!("requesting LFC prewarm state from postgres");
|
||||
let mut state = LfcPrewarmStateWithProgress::default();
|
||||
{
|
||||
state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
|
||||
}
|
||||
|
||||
let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
|
||||
Ok(client) => client,
|
||||
Err(err) => {
|
||||
error!(%err, "connecting to postgres");
|
||||
return state;
|
||||
}
|
||||
};
|
||||
let row = match client
|
||||
.query_one("select * from neon.get_prewarm_info()", &[])
|
||||
.await
|
||||
{
|
||||
Ok(row) => row,
|
||||
Err(err) => {
|
||||
error!(%err, "querying LFC prewarm status");
|
||||
return state;
|
||||
}
|
||||
};
|
||||
state.total = row.try_get(0).unwrap_or_default();
|
||||
state.prewarmed = row.try_get(1).unwrap_or_default();
|
||||
state.skipped = row.try_get(2).unwrap_or_default();
|
||||
state
|
||||
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmState {
|
||||
self.state.lock().unwrap().lfc_prewarm_state.clone()
|
||||
}
|
||||
|
||||
pub fn lfc_offload_state(&self) -> LfcOffloadState {
|
||||
@@ -133,7 +97,6 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
/// Request LFC state from endpoint storage and load corresponding pages into Postgres.
|
||||
/// Returns a result with `false` if the LFC state is not found in endpoint storage.
|
||||
async fn prewarm_impl(
|
||||
&self,
|
||||
from_endpoint: Option<String>,
|
||||
@@ -148,6 +111,7 @@ impl ComputeNode {
|
||||
fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
|
||||
|
||||
info!(%url, "requesting LFC state from endpoint storage");
|
||||
let mut now = Instant::now();
|
||||
let request = Client::new().get(&url).bearer_auth(storage_token);
|
||||
let response = select! {
|
||||
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
|
||||
@@ -160,6 +124,8 @@ impl ComputeNode {
|
||||
StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
|
||||
status => bail!("{status} querying endpoint storage"),
|
||||
}
|
||||
let state_download_time_ms = now.elapsed().as_millis() as u32;
|
||||
now = Instant::now();
|
||||
|
||||
let mut uncompressed = Vec::new();
|
||||
let lfc_state = select! {
|
||||
@@ -174,6 +140,8 @@ impl ComputeNode {
|
||||
read = decoder.read_to_end(&mut uncompressed) => read
|
||||
}
|
||||
.context("decoding LFC state")?;
|
||||
let uncompress_time_ms = now.elapsed().as_millis() as u32;
|
||||
now = Instant::now();
|
||||
|
||||
let uncompressed_len = uncompressed.len();
|
||||
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
|
||||
@@ -196,15 +164,34 @@ impl ComputeNode {
|
||||
}
|
||||
.context("loading LFC state into postgres")
|
||||
.map(|_| ())?;
|
||||
let prewarm_time_ms = now.elapsed().as_millis() as u32;
|
||||
|
||||
Ok(LfcPrewarmState::Completed)
|
||||
let row = client
|
||||
.query_one("select * from neon.get_prewarm_info()", &[])
|
||||
.await
|
||||
.context("querying prewarm info")?;
|
||||
let total = row.try_get(0).unwrap_or_default();
|
||||
let prewarmed = row.try_get(1).unwrap_or_default();
|
||||
let skipped = row.try_get(2).unwrap_or_default();
|
||||
|
||||
Ok(LfcPrewarmState::Completed {
|
||||
total,
|
||||
prewarmed,
|
||||
skipped,
|
||||
state_download_time_ms,
|
||||
uncompress_time_ms,
|
||||
prewarm_time_ms,
|
||||
})
|
||||
}
|
||||
|
||||
/// If offload request is ongoing, return false, true otherwise
|
||||
pub fn offload_lfc(self: &Arc<Self>) -> bool {
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap().lfc_offload_state;
|
||||
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
|
||||
if matches!(
|
||||
replace(state, LfcOffloadState::Offloading),
|
||||
LfcOffloadState::Offloading
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -216,7 +203,10 @@ impl ComputeNode {
|
||||
pub async fn offload_lfc_async(self: &Arc<Self>) {
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap().lfc_offload_state;
|
||||
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
|
||||
if matches!(
|
||||
replace(state, LfcOffloadState::Offloading),
|
||||
LfcOffloadState::Offloading
|
||||
) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -234,7 +224,6 @@ impl ComputeNode {
|
||||
LfcOffloadState::Failed { error }
|
||||
}
|
||||
};
|
||||
|
||||
self.state.lock().unwrap().lfc_offload_state = state;
|
||||
}
|
||||
|
||||
@@ -242,6 +231,7 @@ impl ComputeNode {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
|
||||
info!(%url, "requesting LFC state from Postgres");
|
||||
|
||||
let mut now = Instant::now();
|
||||
let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
.await
|
||||
.context("connecting to postgres")?
|
||||
@@ -255,25 +245,36 @@ impl ComputeNode {
|
||||
info!(%url, "empty LFC state, not exporting");
|
||||
return Ok(LfcOffloadState::Skipped);
|
||||
};
|
||||
let state_query_time_ms = now.elapsed().as_millis() as u32;
|
||||
now = Instant::now();
|
||||
|
||||
let mut compressed = Vec::new();
|
||||
ZstdEncoder::new(state)
|
||||
.read_to_end(&mut compressed)
|
||||
.await
|
||||
.context("compressing LFC state")?;
|
||||
let compress_time_ms = now.elapsed().as_millis() as u32;
|
||||
now = Instant::now();
|
||||
|
||||
let compressed_len = compressed.len();
|
||||
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
|
||||
info!(%url, "downloaded LFC state, compressed size {compressed_len}");
|
||||
|
||||
let request = Client::new().put(url).bearer_auth(token).body(compressed);
|
||||
match request.send().await {
|
||||
Ok(res) if res.status() == StatusCode::OK => Ok(LfcOffloadState::Completed),
|
||||
Ok(res) => bail!(
|
||||
"Request to endpoint storage failed with status: {}",
|
||||
res.status()
|
||||
),
|
||||
Err(err) => Err(err).context("writing to endpoint storage"),
|
||||
let response = request
|
||||
.send()
|
||||
.await
|
||||
.context("writing to endpoint storage")?;
|
||||
let state_upload_time_ms = now.elapsed().as_millis() as u32;
|
||||
let status = response.status();
|
||||
if status != StatusCode::OK {
|
||||
bail!("request to endpoint storage failed: {status}");
|
||||
}
|
||||
|
||||
Ok(LfcOffloadState::Completed {
|
||||
compress_time_ms,
|
||||
state_query_time_ms,
|
||||
state_upload_time_ms,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn cancel_prewarm(self: &Arc<Self>) {
|
||||
|
||||
@@ -1,32 +1,24 @@
|
||||
use crate::compute::ComputeNode;
|
||||
use anyhow::{Context, Result, bail};
|
||||
use anyhow::{Context, bail};
|
||||
use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
|
||||
use compute_api::spec::ComputeMode;
|
||||
use itertools::Itertools;
|
||||
use std::collections::HashMap;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::time::sleep;
|
||||
use std::time::Instant;
|
||||
use tracing::info;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
impl ComputeNode {
|
||||
/// Returns only when promote fails or succeeds. If a network error occurs
|
||||
/// and http client disconnects, this does not stop promotion, and subsequent
|
||||
/// calls block until promote finishes.
|
||||
/// Returns only when promote fails or succeeds. If http client calling this function
|
||||
/// disconnects, this does not stop promotion, and subsequent calls block until promote finishes.
|
||||
/// Called by control plane on secondary after primary endpoint is terminated
|
||||
/// Has a failpoint "compute-promotion"
|
||||
pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
|
||||
let cloned = self.clone();
|
||||
let promote_fn = async move || {
|
||||
let Err(err) = cloned.promote_impl(cfg).await else {
|
||||
return PromoteState::Completed;
|
||||
};
|
||||
tracing::error!(%err, "promoting");
|
||||
PromoteState::Failed {
|
||||
error: format!("{err:#}"),
|
||||
pub async fn promote(self: &std::sync::Arc<Self>, cfg: PromoteConfig) -> PromoteState {
|
||||
let this = self.clone();
|
||||
let promote_fn = async move || match this.promote_impl(cfg).await {
|
||||
Ok(state) => state,
|
||||
Err(err) => {
|
||||
tracing::error!(%err, "promoting replica");
|
||||
let error = format!("{err:#}");
|
||||
PromoteState::Failed { error }
|
||||
}
|
||||
};
|
||||
|
||||
let start_promotion = || {
|
||||
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
|
||||
tokio::spawn(async move { tx.send(promote_fn().await) });
|
||||
@@ -34,36 +26,31 @@ impl ComputeNode {
|
||||
};
|
||||
|
||||
let mut task;
|
||||
// self.state is unlocked after block ends so we lock it in promote_impl
|
||||
// and task.changed() is reached
|
||||
// promote_impl locks self.state so we need to unlock it before calling task.changed()
|
||||
{
|
||||
task = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap()
|
||||
.promote_state
|
||||
.get_or_insert_with(start_promotion)
|
||||
.clone()
|
||||
let promote_state = &mut self.state.lock().unwrap().promote_state;
|
||||
task = promote_state.get_or_insert_with(start_promotion).clone()
|
||||
}
|
||||
if task.changed().await.is_err() {
|
||||
let error = "promote sender dropped".to_string();
|
||||
return PromoteState::Failed { error };
|
||||
}
|
||||
task.changed().await.expect("promote sender dropped");
|
||||
task.borrow().clone()
|
||||
}
|
||||
|
||||
async fn promote_impl(&self, mut cfg: PromoteConfig) -> Result<()> {
|
||||
async fn promote_impl(&self, cfg: PromoteConfig) -> anyhow::Result<PromoteState> {
|
||||
{
|
||||
let state = self.state.lock().unwrap();
|
||||
let mode = &state.pspec.as_ref().unwrap().spec.mode;
|
||||
if *mode != ComputeMode::Replica {
|
||||
bail!("{} is not replica", mode.to_type_str());
|
||||
if *mode != compute_api::spec::ComputeMode::Replica {
|
||||
bail!("compute mode \"{}\" is not replica", mode.to_type_str());
|
||||
}
|
||||
|
||||
// we don't need to query Postgres so not self.lfc_prewarm_state()
|
||||
match &state.lfc_prewarm_state {
|
||||
LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => {
|
||||
bail!("prewarm not requested or pending")
|
||||
status @ (LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming) => {
|
||||
bail!("compute {status}")
|
||||
}
|
||||
LfcPrewarmState::Failed { error } => {
|
||||
tracing::warn!(%error, "replica prewarm failed")
|
||||
tracing::warn!(%error, "compute prewarm failed")
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
@@ -72,9 +59,10 @@ impl ComputeNode {
|
||||
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
.await
|
||||
.context("connecting to postgres")?;
|
||||
let mut now = Instant::now();
|
||||
|
||||
let primary_lsn = cfg.wal_flush_lsn;
|
||||
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
|
||||
let mut standby_lsn = utils::lsn::Lsn::INVALID;
|
||||
const RETRIES: i32 = 20;
|
||||
for i in 0..=RETRIES {
|
||||
let row = client
|
||||
@@ -82,16 +70,18 @@ impl ComputeNode {
|
||||
.await
|
||||
.context("getting last replay lsn")?;
|
||||
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
|
||||
last_wal_replay_lsn = lsn.into();
|
||||
if last_wal_replay_lsn >= primary_lsn {
|
||||
standby_lsn = lsn.into();
|
||||
if standby_lsn >= primary_lsn {
|
||||
break;
|
||||
}
|
||||
info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
|
||||
sleep(Duration::from_secs(1)).await;
|
||||
info!(%standby_lsn, %primary_lsn, "catching up, try {i}");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
if last_wal_replay_lsn < primary_lsn {
|
||||
if standby_lsn < primary_lsn {
|
||||
bail!("didn't catch up with primary in {RETRIES} retries");
|
||||
}
|
||||
let lsn_wait_time_ms = now.elapsed().as_millis() as u32;
|
||||
now = Instant::now();
|
||||
|
||||
// using $1 doesn't work with ALTER SYSTEM SET
|
||||
let safekeepers_sql = format!(
|
||||
@@ -102,27 +92,33 @@ impl ComputeNode {
|
||||
.query(&safekeepers_sql, &[])
|
||||
.await
|
||||
.context("setting safekeepers")?;
|
||||
client
|
||||
.query(
|
||||
"ALTER SYSTEM SET synchronous_standby_names=walproposer",
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
.context("setting synchronous_standby_names")?;
|
||||
client
|
||||
.query("SELECT pg_catalog.pg_reload_conf()", &[])
|
||||
.await
|
||||
.context("reloading postgres config")?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
fail::fail_point!("compute-promotion", |_| {
|
||||
bail!("promotion configured to fail because of a failpoint")
|
||||
});
|
||||
fail::fail_point!("compute-promotion", |_| bail!(
|
||||
"compute-promotion failpoint"
|
||||
));
|
||||
|
||||
let row = client
|
||||
.query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
|
||||
.await
|
||||
.context("pg_promote")?;
|
||||
if !row.get::<usize, bool>(0) {
|
||||
bail!("pg_promote() returned false");
|
||||
bail!("pg_promote() failed");
|
||||
}
|
||||
let pg_promote_time_ms = now.elapsed().as_millis() as u32;
|
||||
let now = Instant::now();
|
||||
|
||||
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
|
||||
.await
|
||||
.context("connecting to postgres")?;
|
||||
let row = client
|
||||
.query_one("SHOW transaction_read_only", &[])
|
||||
.await
|
||||
@@ -131,36 +127,47 @@ impl ComputeNode {
|
||||
bail!("replica in read only mode after promotion");
|
||||
}
|
||||
|
||||
// Already checked validity in http handler
|
||||
#[allow(unused_mut)]
|
||||
let mut new_pspec = crate::compute::ParsedSpec::try_from(cfg.spec).expect("invalid spec");
|
||||
{
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let spec = &mut state.pspec.as_mut().unwrap().spec;
|
||||
spec.mode = ComputeMode::Primary;
|
||||
let new_conf = cfg.spec.cluster.postgresql_conf.as_mut().unwrap();
|
||||
let existing_conf = spec.cluster.postgresql_conf.as_ref().unwrap();
|
||||
Self::merge_spec(new_conf, existing_conf);
|
||||
|
||||
// Local setup has different ports for pg process (port=) for primary and secondary.
|
||||
// Primary is stopped so we need secondary's "port" value
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
let old_spec = &state.pspec.as_ref().unwrap().spec;
|
||||
let Some(old_conf) = old_spec.cluster.postgresql_conf.as_ref() else {
|
||||
bail!("pspec.spec.cluster.postgresql_conf missing for endpoint");
|
||||
};
|
||||
let set: std::collections::HashMap<&str, &str> = old_conf
|
||||
.split_terminator('\n')
|
||||
.map(|e| e.split_once("=").expect("invalid item"))
|
||||
.collect();
|
||||
|
||||
let Some(new_conf) = new_pspec.spec.cluster.postgresql_conf.as_mut() else {
|
||||
bail!("pspec.spec.cluster.postgresql_conf missing for supplied config");
|
||||
};
|
||||
new_conf.push_str(&format!("port={}\n", set["port"]));
|
||||
}
|
||||
|
||||
tracing::debug!("applied spec: {:#?}", new_pspec.spec);
|
||||
if self.params.lakebase_mode {
|
||||
ComputeNode::set_spec(&self.params, &mut state, new_pspec);
|
||||
} else {
|
||||
state.pspec = Some(new_pspec);
|
||||
}
|
||||
}
|
||||
|
||||
info!("applied new spec, reconfiguring as primary");
|
||||
self.reconfigure()
|
||||
}
|
||||
self.reconfigure()?;
|
||||
let reconfigure_time_ms = now.elapsed().as_millis() as u32;
|
||||
|
||||
/// Merge old and new Postgres conf specs to apply on secondary.
|
||||
/// Change new spec's port and safekeepers since they are supplied
|
||||
/// differenly
|
||||
fn merge_spec(new_conf: &mut String, existing_conf: &str) {
|
||||
let mut new_conf_set: HashMap<&str, &str> = new_conf
|
||||
.split_terminator('\n')
|
||||
.map(|e| e.split_once("=").expect("invalid item"))
|
||||
.collect();
|
||||
new_conf_set.remove("neon.safekeepers");
|
||||
|
||||
let existing_conf_set: HashMap<&str, &str> = existing_conf
|
||||
.split_terminator('\n')
|
||||
.map(|e| e.split_once("=").expect("invalid item"))
|
||||
.collect();
|
||||
new_conf_set.insert("port", existing_conf_set["port"]);
|
||||
*new_conf = new_conf_set
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{k}={v}"))
|
||||
.join("\n");
|
||||
Ok(PromoteState::Completed {
|
||||
lsn_wait_time_ms,
|
||||
pg_promote_time_ms,
|
||||
reconfigure_time_ms,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -617,9 +617,6 @@ components:
|
||||
type: object
|
||||
required:
|
||||
- status
|
||||
- total
|
||||
- prewarmed
|
||||
- skipped
|
||||
properties:
|
||||
status:
|
||||
description: LFC prewarm status
|
||||
@@ -637,6 +634,15 @@ components:
|
||||
skipped:
|
||||
description: Pages processed but not prewarmed
|
||||
type: integer
|
||||
state_download_time_ms:
|
||||
description: Time it takes to download LFC state to compute
|
||||
type: integer
|
||||
uncompress_time_ms:
|
||||
description: Time it takes to uncompress LFC state
|
||||
type: integer
|
||||
prewarm_time_ms:
|
||||
description: Time it takes to prewarm LFC state in Postgres
|
||||
type: integer
|
||||
|
||||
LfcOffloadState:
|
||||
type: object
|
||||
@@ -650,6 +656,16 @@ components:
|
||||
error:
|
||||
description: LFC offload error, if any
|
||||
type: string
|
||||
state_query_time_ms:
|
||||
description: Time it takes to get LFC state from Postgres
|
||||
type: integer
|
||||
compress_time_ms:
|
||||
description: Time it takes to compress LFC state
|
||||
type: integer
|
||||
state_upload_time_ms:
|
||||
description: Time it takes to upload LFC state to endpoint storage
|
||||
type: integer
|
||||
|
||||
|
||||
PromoteState:
|
||||
type: object
|
||||
@@ -663,6 +679,15 @@ components:
|
||||
error:
|
||||
description: Promote error, if any
|
||||
type: string
|
||||
lsn_wait_time_ms:
|
||||
description: Time it takes for secondary to catch up with primary WAL flush LSN
|
||||
type: integer
|
||||
pg_promote_time_ms:
|
||||
description: Time it takes to call pg_promote on secondary
|
||||
type: integer
|
||||
reconfigure_time_ms:
|
||||
description: Time it takes to reconfigure promoted secondary
|
||||
type: integer
|
||||
|
||||
SetRoleGrantsRequest:
|
||||
type: object
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
use crate::compute_prewarm::LfcPrewarmStateWithProgress;
|
||||
use crate::http::JsonResponse;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{Json, http::StatusCode};
|
||||
use axum_extra::extract::OptionalQuery;
|
||||
use compute_api::responses::LfcOffloadState;
|
||||
use compute_api::responses::{LfcOffloadState, LfcPrewarmState};
|
||||
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
|
||||
|
||||
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmStateWithProgress> {
|
||||
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmState> {
|
||||
Json(compute.lfc_prewarm_state().await)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,22 @@
|
||||
use crate::http::JsonResponse;
|
||||
use axum::extract::Json;
|
||||
use compute_api::responses::PromoteConfig;
|
||||
use http::StatusCode;
|
||||
|
||||
pub(in crate::http) async fn promote(
|
||||
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
|
||||
Json(cfg): Json<compute_api::responses::PromoteConfig>,
|
||||
Json(cfg): Json<PromoteConfig>,
|
||||
) -> axum::response::Response {
|
||||
// Return early at the cost of extra parsing spec
|
||||
let pspec = match crate::compute::ParsedSpec::try_from(cfg.spec) {
|
||||
Ok(p) => p,
|
||||
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
|
||||
};
|
||||
|
||||
let cfg = PromoteConfig {
|
||||
spec: pspec.spec,
|
||||
wal_flush_lsn: cfg.wal_flush_lsn,
|
||||
};
|
||||
let state = compute.promote(cfg).await;
|
||||
if let compute_api::responses::PromoteState::Failed { error: _ } = state {
|
||||
return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state);
|
||||
|
||||
@@ -407,8 +407,8 @@ fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> {
|
||||
// like `postgres_exporter` use it to query Postgres statistics.
|
||||
// Use explicit 8 bytes type casts to match Rust types.
|
||||
let stats = cli.query_one(
|
||||
"SELECT pg_catalog.coalesce(pg_catalog.sum(active_time), 0.0)::pg_catalog.float8 AS total_active_time,
|
||||
pg_catalog.coalesce(pg_catalog.sum(sessions), 0)::pg_catalog.bigint AS total_sessions
|
||||
"SELECT COALESCE(pg_catalog.sum(active_time), 0.0)::pg_catalog.float8 AS total_active_time,
|
||||
COALESCE(pg_catalog.sum(sessions), 0)::pg_catalog.int8 AS total_sessions
|
||||
FROM pg_catalog.pg_stat_database
|
||||
WHERE datname NOT IN (
|
||||
'postgres',
|
||||
|
||||
@@ -241,7 +241,7 @@ impl ComputeControlPlane {
|
||||
drop_subscriptions_before_start,
|
||||
grpc,
|
||||
reconfigure_concurrency: 1,
|
||||
features: vec![],
|
||||
features: vec![ComputeFeature::ActivityMonitorExperimental],
|
||||
cluster: None,
|
||||
compute_ctl_config: compute_ctl_config.clone(),
|
||||
privileged_role_name: privileged_role_name.clone(),
|
||||
@@ -263,7 +263,7 @@ impl ComputeControlPlane {
|
||||
skip_pg_catalog_updates,
|
||||
drop_subscriptions_before_start,
|
||||
reconfigure_concurrency: 1,
|
||||
features: vec![],
|
||||
features: vec![ComputeFeature::ActivityMonitorExperimental],
|
||||
cluster: None,
|
||||
compute_ctl_config,
|
||||
privileged_role_name,
|
||||
|
||||
@@ -303,6 +303,13 @@ enum Command {
|
||||
#[arg(long, required = true, value_delimiter = ',')]
|
||||
new_sk_set: Vec<NodeId>,
|
||||
},
|
||||
/// Abort ongoing safekeeper migration.
|
||||
TimelineSafekeeperMigrateAbort {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
#[arg(long)]
|
||||
timeline_id: TimelineId,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -1396,6 +1403,17 @@ async fn main() -> anyhow::Result<()> {
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::TimelineSafekeeperMigrateAbort {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} => {
|
||||
let path =
|
||||
format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate_abort");
|
||||
|
||||
storcon_client
|
||||
.dispatch::<(), ()>(Method::POST, path, None)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
|
||||
|
||||
use std::fmt::Display;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use jsonwebtoken::jwk::JwkSet;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use std::fmt::Display;
|
||||
|
||||
use crate::privilege::Privilege;
|
||||
use crate::spec::{ComputeSpec, Database, ExtVersion, PgIdent, Role};
|
||||
@@ -49,7 +48,7 @@ pub struct ExtensionInstallResponse {
|
||||
/// Status of the LFC prewarm process. The same state machine is reused for
|
||||
/// both autoprewarm (prewarm after compute/Postgres start using the previously
|
||||
/// stored LFC state) and explicit prewarming via API.
|
||||
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
|
||||
#[derive(Serialize, Default, Debug, Clone)]
|
||||
#[serde(tag = "status", rename_all = "snake_case")]
|
||||
pub enum LfcPrewarmState {
|
||||
/// Default value when compute boots up.
|
||||
@@ -59,7 +58,14 @@ pub enum LfcPrewarmState {
|
||||
Prewarming,
|
||||
/// We found requested LFC state in the endpoint storage and
|
||||
/// completed prewarming successfully.
|
||||
Completed,
|
||||
Completed {
|
||||
total: i32,
|
||||
prewarmed: i32,
|
||||
skipped: i32,
|
||||
state_download_time_ms: u32,
|
||||
uncompress_time_ms: u32,
|
||||
prewarm_time_ms: u32,
|
||||
},
|
||||
/// Unexpected error happened during prewarming. Note, `Not Found 404`
|
||||
/// response from the endpoint storage is explicitly excluded here
|
||||
/// because it can normally happen on the first compute start,
|
||||
@@ -84,7 +90,7 @@ impl Display for LfcPrewarmState {
|
||||
match self {
|
||||
LfcPrewarmState::NotPrewarmed => f.write_str("NotPrewarmed"),
|
||||
LfcPrewarmState::Prewarming => f.write_str("Prewarming"),
|
||||
LfcPrewarmState::Completed => f.write_str("Completed"),
|
||||
LfcPrewarmState::Completed { .. } => f.write_str("Completed"),
|
||||
LfcPrewarmState::Skipped => f.write_str("Skipped"),
|
||||
LfcPrewarmState::Failed { error } => write!(f, "Error({error})"),
|
||||
LfcPrewarmState::Cancelled => f.write_str("Cancelled"),
|
||||
@@ -92,26 +98,36 @@ impl Display for LfcPrewarmState {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
|
||||
#[derive(Serialize, Default, Debug, Clone)]
|
||||
#[serde(tag = "status", rename_all = "snake_case")]
|
||||
pub enum LfcOffloadState {
|
||||
#[default]
|
||||
NotOffloaded,
|
||||
Offloading,
|
||||
Completed,
|
||||
Completed {
|
||||
state_query_time_ms: u32,
|
||||
compress_time_ms: u32,
|
||||
state_upload_time_ms: u32,
|
||||
},
|
||||
Failed {
|
||||
error: String,
|
||||
},
|
||||
/// LFC state was empty so it wasn't offloaded
|
||||
Skipped,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug, Clone, PartialEq)]
|
||||
#[derive(Serialize, Debug, Clone)]
|
||||
#[serde(tag = "status", rename_all = "snake_case")]
|
||||
/// Response of /promote
|
||||
pub enum PromoteState {
|
||||
NotPromoted,
|
||||
Completed,
|
||||
Failed { error: String },
|
||||
Completed {
|
||||
lsn_wait_time_ms: u32,
|
||||
pg_promote_time_ms: u32,
|
||||
reconfigure_time_ms: u32,
|
||||
},
|
||||
Failed {
|
||||
error: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Default, Debug)]
|
||||
|
||||
@@ -644,6 +644,7 @@ async fn handle_tenant_timeline_safekeeper_migrate(
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
// TODO(diko): it's not PS operation, there should be a different permission scope.
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
maybe_rate_limit(&req, tenant_id).await;
|
||||
|
||||
@@ -665,6 +666,23 @@ async fn handle_tenant_timeline_safekeeper_migrate(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_safekeeper_migrate_abort(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
// TODO(diko): it's not PS operation, there should be a different permission scope.
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
maybe_rate_limit(&req, tenant_id).await;
|
||||
|
||||
service
|
||||
.tenant_timeline_safekeeper_migrate_abort(tenant_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_lsn_lease(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -2611,6 +2629,16 @@ pub fn make_router(
|
||||
)
|
||||
},
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/safekeeper_migrate_abort",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
handle_tenant_timeline_safekeeper_migrate_abort,
|
||||
RequestName("v1_tenant_timeline_safekeeper_migrate_abort"),
|
||||
)
|
||||
},
|
||||
)
|
||||
// LSN lease passthrough to all shards
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",
|
||||
|
||||
@@ -1230,10 +1230,7 @@ impl Service {
|
||||
}
|
||||
// It it is the same new_sk_set, we can continue the migration (retry).
|
||||
} else {
|
||||
let prev_finished = timeline.cplane_notified_generation == timeline.generation
|
||||
&& timeline.sk_set_notified_generation == timeline.generation;
|
||||
|
||||
if !prev_finished {
|
||||
if !is_migration_finished(&timeline) {
|
||||
// The previous migration is committed, but the finish step failed.
|
||||
// Safekeepers/cplane might not know about the last membership configuration.
|
||||
// Retry the finish step to ensure smooth migration.
|
||||
@@ -1545,6 +1542,8 @@ impl Service {
|
||||
timeline_id: TimelineId,
|
||||
timeline: &TimelinePersistence,
|
||||
) -> Result<(), ApiError> {
|
||||
tracing::info!(generation=?timeline.generation, sk_set=?timeline.sk_set, new_sk_set=?timeline.new_sk_set, "retrying finish safekeeper migration");
|
||||
|
||||
if timeline.new_sk_set.is_some() {
|
||||
// Logical error, should never happen.
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -1624,4 +1623,120 @@ impl Service {
|
||||
|
||||
Ok(wal_positions[quorum_size - 1])
|
||||
}
|
||||
|
||||
/// Abort ongoing safekeeper migration.
|
||||
pub(crate) async fn tenant_timeline_safekeeper_migrate_abort(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<(), ApiError> {
|
||||
// TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks.
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineSafekeeperMigrate,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Fetch current timeline configuration from the configuration storage.
|
||||
let timeline = self
|
||||
.persistence
|
||||
.get_timeline(tenant_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let Some(timeline) = timeline else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!(
|
||||
"timeline {tenant_id}/{timeline_id} doesn't exist in timelines table"
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
};
|
||||
|
||||
let mut generation = SafekeeperGeneration::new(timeline.generation as u32);
|
||||
|
||||
let Some(new_sk_set) = &timeline.new_sk_set else {
|
||||
// No new_sk_set -> no active migration that we can abort.
|
||||
tracing::info!("timeline has no active migration");
|
||||
|
||||
if !is_migration_finished(&timeline) {
|
||||
// The last migration is committed, but the finish step failed.
|
||||
// Safekeepers/cplane might not know about the last membership configuration.
|
||||
// Retry the finish step to make the timeline state clean.
|
||||
self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline)
|
||||
.await?;
|
||||
}
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
tracing::info!(sk_set=?timeline.sk_set, ?new_sk_set, ?generation, "aborting timeline migration");
|
||||
|
||||
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
|
||||
let new_safekeepers = self.get_safekeepers(new_sk_set)?;
|
||||
|
||||
let cur_sk_member_set =
|
||||
Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
|
||||
|
||||
// Increment current generation and remove new_sk_set from the timeline to abort the migration.
|
||||
generation = generation.next();
|
||||
|
||||
let mconf = membership::Configuration {
|
||||
generation,
|
||||
members: cur_sk_member_set,
|
||||
new_members: None,
|
||||
};
|
||||
|
||||
// Exclude safekeepers which were added during the current migration.
|
||||
let cur_ids: HashSet<NodeId> = cur_safekeepers.iter().map(|sk| sk.get_id()).collect();
|
||||
let exclude_safekeepers = new_safekeepers
|
||||
.into_iter()
|
||||
.filter(|sk| !cur_ids.contains(&sk.get_id()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let exclude_requests = exclude_safekeepers
|
||||
.iter()
|
||||
.map(|sk| TimelinePendingOpPersistence {
|
||||
sk_id: sk.skp.id,
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
generation: generation.into_inner() as i32,
|
||||
op_kind: SafekeeperTimelineOpKind::Exclude,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let cur_sk_set = cur_safekeepers
|
||||
.iter()
|
||||
.map(|sk| sk.get_id())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Persist new mconf and exclude requests.
|
||||
self.persistence
|
||||
.update_timeline_membership(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
generation,
|
||||
&cur_sk_set,
|
||||
None,
|
||||
&exclude_requests,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// At this point we have already commited the abort, but still need to notify
|
||||
// cplane/safekeepers with the new mconf. That's what finish_safekeeper_migration does.
|
||||
self.finish_safekeeper_migration(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&cur_safekeepers,
|
||||
&mconf,
|
||||
&exclude_safekeepers,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn is_migration_finished(timeline: &TimelinePersistence) -> bool {
|
||||
timeline.cplane_notified_generation == timeline.generation
|
||||
&& timeline.sk_set_notified_generation == timeline.generation
|
||||
}
|
||||
|
||||
@@ -2323,6 +2323,19 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
response.raise_for_status()
|
||||
log.info(f"migrate_safekeepers success: {response.json()}")
|
||||
|
||||
def abort_safekeeper_migration(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
):
|
||||
response = self.request(
|
||||
"POST",
|
||||
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate_abort",
|
||||
headers=self.headers(TokenScope.PAGE_SERVER_API),
|
||||
)
|
||||
response.raise_for_status()
|
||||
log.info(f"abort_safekeeper_migration success: {response.json()}")
|
||||
|
||||
def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]:
|
||||
"""
|
||||
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr": str, "listen_http_port": int}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.metrics import parse_metrics
|
||||
@@ -9,13 +10,13 @@ if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_compute_monitor(neon_simple_env: NeonEnv):
|
||||
def test_compute_monitor_downtime_calculation(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that compute_ctl can detect Postgres going down (unresponsive) and
|
||||
reconnect when it comes back online. Also check that the downtime metrics
|
||||
are properly emitted.
|
||||
"""
|
||||
TEST_DB = "test_compute_monitor"
|
||||
TEST_DB = "test_compute_monitor_downtime_calculation"
|
||||
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
@@ -68,3 +69,56 @@ def test_compute_monitor(neon_simple_env: NeonEnv):
|
||||
|
||||
# Just a sanity check that we log the downtime info
|
||||
endpoint.log_contains("downtime_info")
|
||||
|
||||
|
||||
def test_compute_monitor_activity(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test compute monitor correctly detects user activity inside Postgres
|
||||
and updates last_active timestamp in the /status response.
|
||||
"""
|
||||
TEST_DB = "test_compute_monitor_activity_db"
|
||||
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
with endpoint.cursor() as cursor:
|
||||
# Create a new database because `postgres` DB is excluded
|
||||
# from activity monitoring.
|
||||
cursor.execute(f"CREATE DATABASE {TEST_DB}")
|
||||
|
||||
client = endpoint.http_client()
|
||||
|
||||
prev_last_active = None
|
||||
|
||||
def check_last_active():
|
||||
nonlocal prev_last_active
|
||||
|
||||
with endpoint.cursor(dbname=TEST_DB) as cursor:
|
||||
# Execute some dummy query to generate 'activity'.
|
||||
cursor.execute("SELECT * FROM generate_series(1, 10000)")
|
||||
|
||||
status = client.status()
|
||||
assert status["last_active"] is not None
|
||||
prev_last_active = status["last_active"]
|
||||
|
||||
wait_until(check_last_active)
|
||||
|
||||
assert prev_last_active is not None
|
||||
|
||||
# Sleep for everything to settle down. It's not strictly necessary,
|
||||
# but should still remove any potential noise and/or prevent test from passing
|
||||
# even if compute monitor is not working.
|
||||
time.sleep(3)
|
||||
|
||||
with endpoint.cursor(dbname=TEST_DB) as cursor:
|
||||
cursor.execute("SELECT * FROM generate_series(1, 10000)")
|
||||
|
||||
def check_last_active_updated():
|
||||
nonlocal prev_last_active
|
||||
|
||||
status = client.status()
|
||||
assert status["last_active"] is not None
|
||||
assert status["last_active"] != prev_last_active
|
||||
assert status["last_active"] > prev_last_active
|
||||
|
||||
wait_until(check_last_active_updated)
|
||||
|
||||
@@ -145,6 +145,7 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
|
||||
stop_and_check_lsn(secondary, None)
|
||||
|
||||
if method == PromoteMethod.COMPUTE_CTL:
|
||||
log.info("Restarting primary to check new config")
|
||||
secondary.stop()
|
||||
# In production, compute ultimately receives new compute spec from cplane.
|
||||
secondary.respec(mode="Primary")
|
||||
|
||||
@@ -460,3 +460,91 @@ def test_pull_from_most_advanced_sk(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
ep.start(safekeeper_generation=5, safekeepers=new_sk_set2)
|
||||
assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]
|
||||
|
||||
|
||||
def test_abort_safekeeper_migration(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that safekeeper migration can be aborted.
|
||||
1. Insert failpoints and ensure the abort successfully reverts the timeline state.
|
||||
2. Check that endpoint is operational after the abort.
|
||||
"""
|
||||
neon_env_builder.num_safekeepers = 2
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
"timeline_safekeeper_count": 1,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert len(mconf["sk_set"]) == 1
|
||||
cur_sk = mconf["sk_set"][0]
|
||||
cur_gen = 1
|
||||
|
||||
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
|
||||
ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"])
|
||||
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
|
||||
ep.safe_psql("CREATE TABLE t(a int)")
|
||||
ep.safe_psql("INSERT INTO t VALUES (1)")
|
||||
|
||||
another_sk = [sk.id for sk in env.safekeepers if sk.id != cur_sk][0]
|
||||
|
||||
failpoints = [
|
||||
"sk-migration-after-step-3",
|
||||
"sk-migration-after-step-4",
|
||||
"sk-migration-after-step-5",
|
||||
"sk-migration-after-step-7",
|
||||
]
|
||||
|
||||
for fp in failpoints:
|
||||
env.storage_controller.configure_failpoints((fp, "return(1)"))
|
||||
|
||||
with pytest.raises(StorageControllerApiException, match=f"failpoint {fp}"):
|
||||
env.storage_controller.migrate_safekeepers(
|
||||
env.initial_tenant, env.initial_timeline, [another_sk]
|
||||
)
|
||||
cur_gen += 1
|
||||
|
||||
env.storage_controller.configure_failpoints((fp, "off"))
|
||||
|
||||
# We should have a joint mconf after the failure.
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert mconf["generation"] == cur_gen
|
||||
assert mconf["sk_set"] == [cur_sk]
|
||||
assert mconf["new_sk_set"] == [another_sk]
|
||||
|
||||
env.storage_controller.abort_safekeeper_migration(env.initial_tenant, env.initial_timeline)
|
||||
cur_gen += 1
|
||||
|
||||
# Abort should revert the timeline to the previous sk_set and increment the generation.
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert mconf["generation"] == cur_gen
|
||||
assert mconf["sk_set"] == [cur_sk]
|
||||
assert mconf["new_sk_set"] is None
|
||||
|
||||
assert ep.safe_psql("SHOW neon.safekeepers")[0][0].startswith(f"g#{cur_gen}:")
|
||||
ep.safe_psql(f"INSERT INTO t VALUES ({cur_gen})")
|
||||
|
||||
# After step-8 the final mconf is committed and the migration is not abortable anymore.
|
||||
# So the abort should not abort anything.
|
||||
env.storage_controller.configure_failpoints(("sk-migration-after-step-8", "return(1)"))
|
||||
|
||||
with pytest.raises(StorageControllerApiException, match="failpoint sk-migration-after-step-8"):
|
||||
env.storage_controller.migrate_safekeepers(
|
||||
env.initial_tenant, env.initial_timeline, [another_sk]
|
||||
)
|
||||
cur_gen += 2
|
||||
|
||||
env.storage_controller.configure_failpoints((fp, "off"))
|
||||
|
||||
env.storage_controller.abort_safekeeper_migration(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
# The migration is fully committed, no abort should have been performed.
|
||||
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
|
||||
assert mconf["generation"] == cur_gen
|
||||
assert mconf["sk_set"] == [another_sk]
|
||||
assert mconf["new_sk_set"] is None
|
||||
|
||||
ep.safe_psql(f"INSERT INTO t VALUES ({cur_gen})")
|
||||
ep.clear_buffers()
|
||||
assert ep.safe_psql("SELECT * FROM t") == [(i + 1,) for i in range(cur_gen) if i % 2 == 0]
|
||||
|
||||
Reference in New Issue
Block a user