Compare commits

..

10 Commits

Author SHA1 Message Date
Paul Banks
7a37d83bed Filter obsolete layer files from an older generation from heatmap 2025-08-15 16:21:39 +01:00
Ruslan Talpa
d96cea1917 [proxy] handle options request in rest broker (cors headers) (#12744)
## Problem
rest broker needs to respond with the correct cors headers for the api
to be usable from other domains

## Summary of changes
added a code path in rest broker to handle the OPTIONS requests

---------

Co-authored-by: Ruslan Talpa <ruslan.talpa@databricks.com>
2025-07-31 13:05:09 +00:00
Dmitrii Kovalkov
312a74f11f storcon: implement safekeeper_migrate_abort handler (#12705)
## Problem
Right now if we commit a joint configuration to DB, there is no way
back. The only way to get the clean mconf is to continue the migration.
The RFC also described an abort mechanism, which allows to abort current
migration and revert mconf change. It might be needed if the migration
is stuck and cannot have any progress, e.g. if the sk we are migrating
to went down during the migration. This PR implements this abort
algorithm.

- Closes: https://databricks.atlassian.net/browse/LKB-899
- Closes: https://github.com/neondatabase/neon/issues/12549

## Summary of changes
- Implement `safekeeper_migrate_abort` handler with the algorithm
described in RFC
- Add `timeline-safekeeper-migrate-abort` subcommand to `storcon_cli`
- Add test for the migration abort algorithm.
2025-07-31 12:40:32 +00:00
Mikhail
df4e37b7cc Report timespans for promotion and prewarm (#12730)
- Return sub-actions time spans for prewarm, prewarm offload, and
promotion in http handlers.
- Set `synchronous_standby_names=walproposer` for promoted endpoints.
Otherwise, walproposer on promoted standby ignores reply from safekeeper
and is stuck on lsn COMMIT eternally.
2025-07-31 11:51:19 +00:00
Heikki Linnakangas
b4a63e0a34 Fix how neon.stripe_size option is set in postgresql.conf file (#12776)
Commit 1dce2a9e74 changed how the `neon.pageserver_connstring` setting
is formed, but it messed up setting the `neon.stripe_size` setting so
that it was set twice. That got mixed up during development of the
patch, as commit 7fef4435c1 landed first and was merged incorrectly.
2025-07-31 11:46:57 +00:00
Erik Grinaker
f8fc0bf3c0 neon_local: use doc comments for help texts (#12270)
Clap automatically uses doc comments as help/about texts. Doc comments
are strictly better, since they're also used e.g. for IDE documentation,
and are better formatted.

This patch updates all `neon_local` commands to use doc comments
(courtesy of GPT-o3).
2025-07-31 10:25:33 +00:00
Alexey Kondratov
8fe7596120 chore(compute_tools): Delete unused anon_ext_fn_reassign.sql (#12787)
It's an anon v1 failed launch artifact, I suppose.
2025-07-31 10:11:30 +00:00
Krzysztof Szafrański
f3ee6e818d [proxy] Correctly classify ConnectErrors (#12793)
As is, e.g. quota errors on wake compute are logged as "compute" errors.
2025-07-31 09:53:48 +00:00
Dmitrii Kovalkov
edd60730c8 safekeeper: use last_log_term in mconf switch + choose most advanced sk in pull timeline (#12778)
## Problem
I discovered two bugs corresponding to safekeeper migration, which
together might lead to a data loss during the migration. The second bug
is from a hadron patch and might lead to a data loss during the
safekeeper restore in hadron as well.

1. `switch_membership` returns the current `term` instead of
`last_log_term`. It is used to choose the `sync_position` in the
algorithm, so we might choose the wrong one and break the correctness
guarantees.
2. The current `term` is used to choose the most advanced SK in
`pull_timeline` with higher priority than `flush_lsn`. It is incorrect
because the most advanced safekeeper is the one with the highest
`(last_log_term, flush_lsn)` pair. The compute might bump term on the
least advanced sk, making it the best choice to pull from, and thus
making committed log entries "uncommitted" after `pull_timeline`

Part of https://databricks.atlassian.net/browse/LKB-1017

## Summary of changes
- Return `last_log_term` in `switch_membership`
- Use `(last_log_term, flush_lsn)` as a primary key for choosing the
most advanced sk in `pull_timeline` and deny pulling if the `max_term`
is higher than on the most advanced sk (hadron only)
- Write tests for both cases
- Retry `sync_safekeepers` in `compute_ctl`
- Take into the account the quorum size when calculating `sync_position`
2025-07-31 09:29:25 +00:00
Aleksandr Sarantsev
975b95f4cd Introduce deletion API improvement RFC (#12484)
## Problem

The deletion logic had become difficult to understand and maintain.

## Summary of changes

- Added an RFC detailing proposed improvements to all deletion-related
APIs.

---------

Co-authored-by: Aleksandr Sarantsev <aleksandr.sarantsev@databricks.com>
2025-07-31 08:34:47 +00:00
28 changed files with 2960 additions and 577 deletions

View File

@@ -4,7 +4,6 @@ self-hosted-runner:
- large
- large-arm64
- small
- small-debug
- small-metal
- small-arm64
- unit-perf

View File

@@ -338,3 +338,67 @@ jobs:
- name: Merge and upload coverage data
if: inputs.build-type == 'debug'
uses: ./.github/actions/save-coverage-data
regress-tests:
# Don't run regression tests on debug arm64 builds
if: inputs.build-type != 'debug' || inputs.arch != 'arm64'
permissions:
id-token: write # aws-actions/configure-aws-credentials
contents: read
statuses: write
needs: [ build-neon ]
runs-on: ${{ fromJSON(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large-metal')) }}
container:
image: ${{ inputs.build-tools-image }}
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# for changed limits, see comments on `options:` earlier in this file
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864
strategy:
fail-fast: false
matrix: ${{ fromJSON(format('{{"include":{0}}}', inputs.test-cfg)) }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
submodules: true
- name: Pytest regression tests
continue-on-error: ${{ matrix.lfc_state == 'with-lfc' && inputs.build-type == 'debug' }}
uses: ./.github/actions/run-python-test-set
timeout-minutes: ${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 75 || 180 }}
with:
build_type: ${{ inputs.build-type }}
test_selection: regress
needs_postgres_source: true
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
real_s3_region: eu-central-1
rerun_failed: ${{ inputs.rerun-failed }}
pg_version: ${{ matrix.pg_version }}
sanitizers: ${{ inputs.sanitizers }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
# Attempt to stop tests gracefully to generate test reports
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
extra_params: --session-timeout=${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
${{ inputs.test-selection != '' && format('-k "{0}"', inputs.test-selection) || '' }}
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
BUILD_TAG: ${{ inputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }}
# Temporary disable this step until we figure out why it's so flaky
# Ref https://github.com/neondatabase/neon/issues/4540
- name: Merge and upload coverage data
if: |
false &&
inputs.build-type == 'debug' && matrix.pg_version == 'v16'
uses: ./.github/actions/save-coverage-data

File diff suppressed because it is too large Load Diff

View File

@@ -35,6 +35,9 @@ use tokio::{spawn, sync::watch, task::JoinHandle, time};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, instrument, warn};
use url::Url;
use utils::backoff::{
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff_duration,
};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::measured_stream::MeasuredReader;
@@ -1557,6 +1560,41 @@ impl ComputeNode {
Ok(lsn)
}
fn sync_safekeepers_with_retries(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let max_retries = 5;
let mut attempts = 0;
loop {
let result = self.sync_safekeepers(storage_auth_token.clone());
match &result {
Ok(_) => {
if attempts > 0 {
tracing::info!("sync_safekeepers succeeded after {attempts} retries");
}
return result;
}
Err(e) if attempts < max_retries => {
tracing::info!(
"sync_safekeepers failed, will retry (attempt {attempts}): {e:#}"
);
}
Err(err) => {
tracing::warn!(
"sync_safekeepers still failed after {attempts} retries, giving up: {err:?}"
);
return result;
}
}
// sleep and retry
let backoff = exponential_backoff_duration(
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
);
std::thread::sleep(backoff);
attempts += 1;
}
}
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip_all)]
@@ -1592,7 +1630,7 @@ impl ComputeNode {
lsn
} else {
info!("starting safekeepers syncing");
self.sync_safekeepers(pspec.storage_auth_token.clone())
self.sync_safekeepers_with_retries(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?
};
info!("safekeepers synced at LSN {}", lsn);
@@ -2742,7 +2780,7 @@ LIMIT 100",
// 4. We start again and try to prewarm with the state from 2. instead of the previous complete state
if matches!(
prewarm_state,
LfcPrewarmState::Completed
LfcPrewarmState::Completed { .. }
| LfcPrewarmState::NotPrewarmed
| LfcPrewarmState::Skipped
) {

View File

@@ -7,19 +7,11 @@ use http::StatusCode;
use reqwest::Client;
use std::mem::replace;
use std::sync::Arc;
use std::time::Instant;
use tokio::{io::AsyncReadExt, select, spawn};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
#[derive(serde::Serialize, Default)]
pub struct LfcPrewarmStateWithProgress {
#[serde(flatten)]
base: LfcPrewarmState,
total: i32,
prewarmed: i32,
skipped: i32,
}
/// A pair of url and a token to query endpoint storage for LFC prewarm-related tasks
struct EndpointStoragePair {
url: String,
@@ -28,7 +20,7 @@ struct EndpointStoragePair {
const KEY: &str = "lfc_state";
impl EndpointStoragePair {
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
/// endpoint_id is set to None while prewarming from other endpoint, see compute_promote.rs
/// If not None, takes precedence over pspec.spec.endpoint_id
fn from_spec_and_endpoint(
pspec: &crate::compute::ParsedSpec,
@@ -54,36 +46,8 @@ impl EndpointStoragePair {
}
impl ComputeNode {
// If prewarm failed, we want to get overall number of segments as well as done ones.
// However, this function should be reliable even if querying postgres failed.
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmStateWithProgress {
info!("requesting LFC prewarm state from postgres");
let mut state = LfcPrewarmStateWithProgress::default();
{
state.base = self.state.lock().unwrap().lfc_prewarm_state.clone();
}
let client = match ComputeNode::get_maintenance_client(&self.tokio_conn_conf).await {
Ok(client) => client,
Err(err) => {
error!(%err, "connecting to postgres");
return state;
}
};
let row = match client
.query_one("select * from neon.get_prewarm_info()", &[])
.await
{
Ok(row) => row,
Err(err) => {
error!(%err, "querying LFC prewarm status");
return state;
}
};
state.total = row.try_get(0).unwrap_or_default();
state.prewarmed = row.try_get(1).unwrap_or_default();
state.skipped = row.try_get(2).unwrap_or_default();
state
pub async fn lfc_prewarm_state(&self) -> LfcPrewarmState {
self.state.lock().unwrap().lfc_prewarm_state.clone()
}
pub fn lfc_offload_state(&self) -> LfcOffloadState {
@@ -133,7 +97,6 @@ impl ComputeNode {
}
/// Request LFC state from endpoint storage and load corresponding pages into Postgres.
/// Returns a result with `false` if the LFC state is not found in endpoint storage.
async fn prewarm_impl(
&self,
from_endpoint: Option<String>,
@@ -148,6 +111,7 @@ impl ComputeNode {
fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint"));
info!(%url, "requesting LFC state from endpoint storage");
let mut now = Instant::now();
let request = Client::new().get(&url).bearer_auth(storage_token);
let response = select! {
_ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled),
@@ -160,6 +124,8 @@ impl ComputeNode {
StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped),
status => bail!("{status} querying endpoint storage"),
}
let state_download_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let mut uncompressed = Vec::new();
let lfc_state = select! {
@@ -174,6 +140,8 @@ impl ComputeNode {
read = decoder.read_to_end(&mut uncompressed) => read
}
.context("decoding LFC state")?;
let uncompress_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let uncompressed_len = uncompressed.len();
info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}");
@@ -196,15 +164,34 @@ impl ComputeNode {
}
.context("loading LFC state into postgres")
.map(|_| ())?;
let prewarm_time_ms = now.elapsed().as_millis() as u32;
Ok(LfcPrewarmState::Completed)
let row = client
.query_one("select * from neon.get_prewarm_info()", &[])
.await
.context("querying prewarm info")?;
let total = row.try_get(0).unwrap_or_default();
let prewarmed = row.try_get(1).unwrap_or_default();
let skipped = row.try_get(2).unwrap_or_default();
Ok(LfcPrewarmState::Completed {
total,
prewarmed,
skipped,
state_download_time_ms,
uncompress_time_ms,
prewarm_time_ms,
})
}
/// If offload request is ongoing, return false, true otherwise
pub fn offload_lfc(self: &Arc<Self>) -> bool {
{
let state = &mut self.state.lock().unwrap().lfc_offload_state;
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
if matches!(
replace(state, LfcOffloadState::Offloading),
LfcOffloadState::Offloading
) {
return false;
}
}
@@ -216,7 +203,10 @@ impl ComputeNode {
pub async fn offload_lfc_async(self: &Arc<Self>) {
{
let state = &mut self.state.lock().unwrap().lfc_offload_state;
if replace(state, LfcOffloadState::Offloading) == LfcOffloadState::Offloading {
if matches!(
replace(state, LfcOffloadState::Offloading),
LfcOffloadState::Offloading
) {
return;
}
}
@@ -234,7 +224,6 @@ impl ComputeNode {
LfcOffloadState::Failed { error }
}
};
self.state.lock().unwrap().lfc_offload_state = state;
}
@@ -242,6 +231,7 @@ impl ComputeNode {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from Postgres");
let mut now = Instant::now();
let row = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
@@ -255,25 +245,36 @@ impl ComputeNode {
info!(%url, "empty LFC state, not exporting");
return Ok(LfcOffloadState::Skipped);
};
let state_query_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let mut compressed = Vec::new();
ZstdEncoder::new(state)
.read_to_end(&mut compressed)
.await
.context("compressing LFC state")?;
let compress_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
let compressed_len = compressed.len();
info!(%url, "downloaded LFC state, compressed size {compressed_len}, writing to endpoint storage");
info!(%url, "downloaded LFC state, compressed size {compressed_len}");
let request = Client::new().put(url).bearer_auth(token).body(compressed);
match request.send().await {
Ok(res) if res.status() == StatusCode::OK => Ok(LfcOffloadState::Completed),
Ok(res) => bail!(
"Request to endpoint storage failed with status: {}",
res.status()
),
Err(err) => Err(err).context("writing to endpoint storage"),
let response = request
.send()
.await
.context("writing to endpoint storage")?;
let state_upload_time_ms = now.elapsed().as_millis() as u32;
let status = response.status();
if status != StatusCode::OK {
bail!("request to endpoint storage failed: {status}");
}
Ok(LfcOffloadState::Completed {
compress_time_ms,
state_query_time_ms,
state_upload_time_ms,
})
}
pub fn cancel_prewarm(self: &Arc<Self>) {

View File

@@ -1,32 +1,24 @@
use crate::compute::ComputeNode;
use anyhow::{Context, Result, bail};
use anyhow::{Context, bail};
use compute_api::responses::{LfcPrewarmState, PromoteConfig, PromoteState};
use compute_api::spec::ComputeMode;
use itertools::Itertools;
use std::collections::HashMap;
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use std::time::Instant;
use tracing::info;
use utils::lsn::Lsn;
impl ComputeNode {
/// Returns only when promote fails or succeeds. If a network error occurs
/// and http client disconnects, this does not stop promotion, and subsequent
/// calls block until promote finishes.
/// Returns only when promote fails or succeeds. If http client calling this function
/// disconnects, this does not stop promotion, and subsequent calls block until promote finishes.
/// Called by control plane on secondary after primary endpoint is terminated
/// Has a failpoint "compute-promotion"
pub async fn promote(self: &Arc<Self>, cfg: PromoteConfig) -> PromoteState {
let cloned = self.clone();
let promote_fn = async move || {
let Err(err) = cloned.promote_impl(cfg).await else {
return PromoteState::Completed;
};
tracing::error!(%err, "promoting");
PromoteState::Failed {
error: format!("{err:#}"),
pub async fn promote(self: &std::sync::Arc<Self>, cfg: PromoteConfig) -> PromoteState {
let this = self.clone();
let promote_fn = async move || match this.promote_impl(cfg).await {
Ok(state) => state,
Err(err) => {
tracing::error!(%err, "promoting replica");
let error = format!("{err:#}");
PromoteState::Failed { error }
}
};
let start_promotion = || {
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
tokio::spawn(async move { tx.send(promote_fn().await) });
@@ -34,36 +26,31 @@ impl ComputeNode {
};
let mut task;
// self.state is unlocked after block ends so we lock it in promote_impl
// and task.changed() is reached
// promote_impl locks self.state so we need to unlock it before calling task.changed()
{
task = self
.state
.lock()
.unwrap()
.promote_state
.get_or_insert_with(start_promotion)
.clone()
let promote_state = &mut self.state.lock().unwrap().promote_state;
task = promote_state.get_or_insert_with(start_promotion).clone()
}
if task.changed().await.is_err() {
let error = "promote sender dropped".to_string();
return PromoteState::Failed { error };
}
task.changed().await.expect("promote sender dropped");
task.borrow().clone()
}
async fn promote_impl(&self, mut cfg: PromoteConfig) -> Result<()> {
async fn promote_impl(&self, cfg: PromoteConfig) -> anyhow::Result<PromoteState> {
{
let state = self.state.lock().unwrap();
let mode = &state.pspec.as_ref().unwrap().spec.mode;
if *mode != ComputeMode::Replica {
bail!("{} is not replica", mode.to_type_str());
if *mode != compute_api::spec::ComputeMode::Replica {
bail!("compute mode \"{}\" is not replica", mode.to_type_str());
}
// we don't need to query Postgres so not self.lfc_prewarm_state()
match &state.lfc_prewarm_state {
LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => {
bail!("prewarm not requested or pending")
status @ (LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming) => {
bail!("compute {status}")
}
LfcPrewarmState::Failed { error } => {
tracing::warn!(%error, "replica prewarm failed")
tracing::warn!(%error, "compute prewarm failed")
}
_ => {}
}
@@ -72,9 +59,10 @@ impl ComputeNode {
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let mut now = Instant::now();
let primary_lsn = cfg.wal_flush_lsn;
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
let mut standby_lsn = utils::lsn::Lsn::INVALID;
const RETRIES: i32 = 20;
for i in 0..=RETRIES {
let row = client
@@ -82,16 +70,18 @@ impl ComputeNode {
.await
.context("getting last replay lsn")?;
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
last_wal_replay_lsn = lsn.into();
if last_wal_replay_lsn >= primary_lsn {
standby_lsn = lsn.into();
if standby_lsn >= primary_lsn {
break;
}
info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
sleep(Duration::from_secs(1)).await;
info!(%standby_lsn, %primary_lsn, "catching up, try {i}");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
if last_wal_replay_lsn < primary_lsn {
if standby_lsn < primary_lsn {
bail!("didn't catch up with primary in {RETRIES} retries");
}
let lsn_wait_time_ms = now.elapsed().as_millis() as u32;
now = Instant::now();
// using $1 doesn't work with ALTER SYSTEM SET
let safekeepers_sql = format!(
@@ -102,27 +92,33 @@ impl ComputeNode {
.query(&safekeepers_sql, &[])
.await
.context("setting safekeepers")?;
client
.query(
"ALTER SYSTEM SET synchronous_standby_names=walproposer",
&[],
)
.await
.context("setting synchronous_standby_names")?;
client
.query("SELECT pg_catalog.pg_reload_conf()", &[])
.await
.context("reloading postgres config")?;
#[cfg(feature = "testing")]
fail::fail_point!("compute-promotion", |_| {
bail!("promotion configured to fail because of a failpoint")
});
fail::fail_point!("compute-promotion", |_| bail!(
"compute-promotion failpoint"
));
let row = client
.query_one("SELECT * FROM pg_catalog.pg_promote()", &[])
.await
.context("pg_promote")?;
if !row.get::<usize, bool>(0) {
bail!("pg_promote() returned false");
bail!("pg_promote() failed");
}
let pg_promote_time_ms = now.elapsed().as_millis() as u32;
let now = Instant::now();
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let row = client
.query_one("SHOW transaction_read_only", &[])
.await
@@ -131,36 +127,47 @@ impl ComputeNode {
bail!("replica in read only mode after promotion");
}
// Already checked validity in http handler
#[allow(unused_mut)]
let mut new_pspec = crate::compute::ParsedSpec::try_from(cfg.spec).expect("invalid spec");
{
let mut state = self.state.lock().unwrap();
let spec = &mut state.pspec.as_mut().unwrap().spec;
spec.mode = ComputeMode::Primary;
let new_conf = cfg.spec.cluster.postgresql_conf.as_mut().unwrap();
let existing_conf = spec.cluster.postgresql_conf.as_ref().unwrap();
Self::merge_spec(new_conf, existing_conf);
// Local setup has different ports for pg process (port=) for primary and secondary.
// Primary is stopped so we need secondary's "port" value
#[cfg(feature = "testing")]
{
let old_spec = &state.pspec.as_ref().unwrap().spec;
let Some(old_conf) = old_spec.cluster.postgresql_conf.as_ref() else {
bail!("pspec.spec.cluster.postgresql_conf missing for endpoint");
};
let set: std::collections::HashMap<&str, &str> = old_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
let Some(new_conf) = new_pspec.spec.cluster.postgresql_conf.as_mut() else {
bail!("pspec.spec.cluster.postgresql_conf missing for supplied config");
};
new_conf.push_str(&format!("port={}\n", set["port"]));
}
tracing::debug!("applied spec: {:#?}", new_pspec.spec);
if self.params.lakebase_mode {
ComputeNode::set_spec(&self.params, &mut state, new_pspec);
} else {
state.pspec = Some(new_pspec);
}
}
info!("applied new spec, reconfiguring as primary");
self.reconfigure()
}
self.reconfigure()?;
let reconfigure_time_ms = now.elapsed().as_millis() as u32;
/// Merge old and new Postgres conf specs to apply on secondary.
/// Change new spec's port and safekeepers since they are supplied
/// differenly
fn merge_spec(new_conf: &mut String, existing_conf: &str) {
let mut new_conf_set: HashMap<&str, &str> = new_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.remove("neon.safekeepers");
let existing_conf_set: HashMap<&str, &str> = existing_conf
.split_terminator('\n')
.map(|e| e.split_once("=").expect("invalid item"))
.collect();
new_conf_set.insert("port", existing_conf_set["port"]);
*new_conf = new_conf_set
.iter()
.map(|(k, v)| format!("{k}={v}"))
.join("\n");
Ok(PromoteState::Completed {
lsn_wait_time_ms,
pg_promote_time_ms,
reconfigure_time_ms,
})
}
}

View File

@@ -65,14 +65,19 @@ pub fn write_postgres_conf(
writeln!(file, "{conf}")?;
}
// Stripe size GUC should be defined prior to connection string
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
// Add options for connecting to storage
writeln!(file, "# Neon storage settings")?;
writeln!(file)?;
if let Some(conninfo) = &spec.pageserver_connection_info {
// Stripe size GUC should be defined prior to connection string
if let Some(stripe_size) = conninfo.stripe_size {
writeln!(
file,
"# from compute spec's pageserver_connection_info.stripe_size field"
)?;
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
let mut libpq_urls: Option<Vec<String>> = Some(Vec::new());
let num_shards = if conninfo.shard_count.0 == 0 {
1 // unsharded, treat it as a single shard
@@ -110,7 +115,7 @@ pub fn write_postgres_conf(
if let Some(libpq_urls) = libpq_urls {
writeln!(
file,
"# derived from compute spec's pageserver_conninfo field"
"# derived from compute spec's pageserver_connection_info field"
)?;
writeln!(
file,
@@ -120,24 +125,16 @@ pub fn write_postgres_conf(
} else {
writeln!(file, "# no neon.pageserver_connstring")?;
}
if let Some(stripe_size) = conninfo.stripe_size {
writeln!(
file,
"# from compute spec's pageserver_conninfo.stripe_size field"
)?;
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
} else {
if let Some(s) = &spec.pageserver_connstring {
writeln!(file, "# from compute spec's pageserver_connstring field")?;
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
}
// Stripe size GUC should be defined prior to connection string
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "# from compute spec's shard_stripe_size field")?;
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
if let Some(s) = &spec.pageserver_connstring {
writeln!(file, "# from compute spec's pageserver_connstring field")?;
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
}
}
if !spec.safekeeper_connstrings.is_empty() {

View File

@@ -617,9 +617,6 @@ components:
type: object
required:
- status
- total
- prewarmed
- skipped
properties:
status:
description: LFC prewarm status
@@ -637,6 +634,15 @@ components:
skipped:
description: Pages processed but not prewarmed
type: integer
state_download_time_ms:
description: Time it takes to download LFC state to compute
type: integer
uncompress_time_ms:
description: Time it takes to uncompress LFC state
type: integer
prewarm_time_ms:
description: Time it takes to prewarm LFC state in Postgres
type: integer
LfcOffloadState:
type: object
@@ -650,6 +656,16 @@ components:
error:
description: LFC offload error, if any
type: string
state_query_time_ms:
description: Time it takes to get LFC state from Postgres
type: integer
compress_time_ms:
description: Time it takes to compress LFC state
type: integer
state_upload_time_ms:
description: Time it takes to upload LFC state to endpoint storage
type: integer
PromoteState:
type: object
@@ -663,6 +679,15 @@ components:
error:
description: Promote error, if any
type: string
lsn_wait_time_ms:
description: Time it takes for secondary to catch up with primary WAL flush LSN
type: integer
pg_promote_time_ms:
description: Time it takes to call pg_promote on secondary
type: integer
reconfigure_time_ms:
description: Time it takes to reconfigure promoted secondary
type: integer
SetRoleGrantsRequest:
type: object

View File

@@ -1,12 +1,11 @@
use crate::compute_prewarm::LfcPrewarmStateWithProgress;
use crate::http::JsonResponse;
use axum::response::{IntoResponse, Response};
use axum::{Json, http::StatusCode};
use axum_extra::extract::OptionalQuery;
use compute_api::responses::LfcOffloadState;
use compute_api::responses::{LfcOffloadState, LfcPrewarmState};
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmStateWithProgress> {
pub(in crate::http) async fn prewarm_state(compute: Compute) -> Json<LfcPrewarmState> {
Json(compute.lfc_prewarm_state().await)
}

View File

@@ -1,11 +1,22 @@
use crate::http::JsonResponse;
use axum::extract::Json;
use compute_api::responses::PromoteConfig;
use http::StatusCode;
pub(in crate::http) async fn promote(
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
Json(cfg): Json<compute_api::responses::PromoteConfig>,
Json(cfg): Json<PromoteConfig>,
) -> axum::response::Response {
// Return early at the cost of extra parsing spec
let pspec = match crate::compute::ParsedSpec::try_from(cfg.spec) {
Ok(p) => p,
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
};
let cfg = PromoteConfig {
spec: pspec.spec,
wal_flush_lsn: cfg.wal_flush_lsn,
};
let state = compute.promote(cfg).await;
if let compute_api::responses::PromoteState::Failed { error: _ } = state {
return JsonResponse::create_response(StatusCode::INTERNAL_SERVER_ERROR, state);

View File

@@ -1,13 +0,0 @@
DO $$
DECLARE
query varchar;
BEGIN
FOR query IN
SELECT pg_catalog.format('ALTER FUNCTION %I(%s) OWNER TO {db_owner};', p.oid::regproc, pg_catalog.pg_get_function_identity_arguments(p.oid))
FROM pg_catalog.pg_proc p
WHERE p.pronamespace OPERATOR(pg_catalog.=) 'anon'::regnamespace::oid
LOOP
EXECUTE query;
END LOOP;
END
$$;

View File

@@ -71,8 +71,9 @@ const DEFAULT_PG_VERSION_NUM: &str = "17";
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
/// Neon CLI.
#[derive(clap::Parser)]
#[command(version = GIT_VERSION, about, name = "Neon CLI")]
#[command(version = GIT_VERSION, name = "Neon CLI")]
struct Cli {
#[command(subcommand)]
command: NeonLocalCmd,
@@ -107,30 +108,31 @@ enum NeonLocalCmd {
Stop(StopCmdArgs),
}
/// Initialize a new Neon repository, preparing configs for services to start with.
#[derive(clap::Args)]
#[clap(about = "Initialize a new Neon repository, preparing configs for services to start with")]
struct InitCmdArgs {
#[clap(long, help("How many pageservers to create (default 1)"))]
/// How many pageservers to create (default 1).
#[clap(long)]
num_pageservers: Option<u16>,
#[clap(long)]
config: Option<PathBuf>,
#[clap(long, help("Force initialization even if the repository is not empty"))]
/// Force initialization even if the repository is not empty.
#[clap(long, default_value = "must-not-exist")]
#[arg(value_parser)]
#[clap(default_value = "must-not-exist")]
force: InitForceMode,
}
/// Start pageserver and safekeepers.
#[derive(clap::Args)]
#[clap(about = "Start pageserver and safekeepers")]
struct StartCmdArgs {
#[clap(long = "start-timeout", default_value = "10s")]
timeout: humantime::Duration,
}
/// Stop pageserver and safekeepers.
#[derive(clap::Args)]
#[clap(about = "Stop pageserver and safekeepers")]
struct StopCmdArgs {
#[arg(value_enum)]
#[clap(long, default_value_t = StopMode::Fast)]
@@ -143,8 +145,8 @@ enum StopMode {
Immediate,
}
/// Manage tenants.
#[derive(clap::Subcommand)]
#[clap(about = "Manage tenants")]
enum TenantCmd {
List,
Create(TenantCreateCmdArgs),
@@ -155,38 +157,36 @@ enum TenantCmd {
#[derive(clap::Args)]
struct TenantCreateCmdArgs {
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
tenant_id: Option<TenantId>,
#[clap(
long,
help = "Use a specific timeline id when creating a tenant and its initial timeline"
)]
/// Use a specific timeline id when creating a tenant and its initial timeline.
#[clap(long)]
timeline_id: Option<TimelineId>,
#[clap(short = 'c')]
config: Vec<String>,
/// Postgres version to use for the initial timeline.
#[arg(default_value = DEFAULT_PG_VERSION_NUM)]
#[clap(long, help = "Postgres version to use for the initial timeline")]
#[clap(long)]
pg_version: PgMajorVersion,
#[clap(
long,
help = "Use this tenant in future CLI commands where tenant_id is needed, but not specified"
)]
/// Use this tenant in future CLI commands where tenant_id is needed, but not specified.
#[clap(long)]
set_default: bool,
#[clap(long, help = "Number of shards in the new tenant")]
/// Number of shards in the new tenant.
#[clap(long)]
#[arg(default_value_t = 0)]
shard_count: u8,
#[clap(long, help = "Sharding stripe size in pages")]
/// Sharding stripe size in pages.
#[clap(long)]
shard_stripe_size: Option<u32>,
#[clap(long, help = "Placement policy shards in this tenant")]
/// Placement policy shards in this tenant.
#[clap(long)]
#[arg(value_parser = parse_placement_policy)]
placement_policy: Option<PlacementPolicy>,
}
@@ -195,44 +195,35 @@ fn parse_placement_policy(s: &str) -> anyhow::Result<PlacementPolicy> {
Ok(serde_json::from_str::<PlacementPolicy>(s)?)
}
/// Set a particular tenant as default in future CLI commands where tenant_id is needed, but not
/// specified.
#[derive(clap::Args)]
#[clap(
about = "Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified"
)]
struct TenantSetDefaultCmdArgs {
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
tenant_id: TenantId,
}
#[derive(clap::Args)]
struct TenantConfigCmdArgs {
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
tenant_id: Option<TenantId>,
#[clap(short = 'c')]
config: Vec<String>,
}
/// Import a tenant that is present in remote storage, and create branches for its timelines.
#[derive(clap::Args)]
#[clap(
about = "Import a tenant that is present in remote storage, and create branches for its timelines"
)]
struct TenantImportCmdArgs {
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
tenant_id: TenantId,
}
/// Manage timelines.
#[derive(clap::Subcommand)]
#[clap(about = "Manage timelines")]
enum TimelineCmd {
List(TimelineListCmdArgs),
Branch(TimelineBranchCmdArgs),
@@ -240,98 +231,87 @@ enum TimelineCmd {
Import(TimelineImportCmdArgs),
}
/// List all timelines available to this pageserver.
#[derive(clap::Args)]
#[clap(about = "List all timelines available to this pageserver")]
struct TimelineListCmdArgs {
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
tenant_shard_id: Option<TenantShardId>,
}
/// Create a new timeline, branching off from another timeline.
#[derive(clap::Args)]
#[clap(about = "Create a new timeline, branching off from another timeline")]
struct TimelineBranchCmdArgs {
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
tenant_id: Option<TenantId>,
#[clap(long, help = "New timeline's ID")]
/// New timeline's ID, as a 32-byte hexadecimal string.
#[clap(long)]
timeline_id: Option<TimelineId>,
#[clap(long, help = "Human-readable alias for the new timeline")]
/// Human-readable alias for the new timeline.
#[clap(long)]
branch_name: String,
#[clap(
long,
help = "Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name."
)]
/// Use last Lsn of another timeline (and its data) as base when creating the new timeline. The
/// timeline gets resolved by its branch name.
#[clap(long)]
ancestor_branch_name: Option<String>,
#[clap(
long,
help = "When using another timeline as base, use a specific Lsn in it instead of the latest one"
)]
/// When using another timeline as base, use a specific Lsn in it instead of the latest one.
#[clap(long)]
ancestor_start_lsn: Option<Lsn>,
}
/// Create a new blank timeline.
#[derive(clap::Args)]
#[clap(about = "Create a new blank timeline")]
struct TimelineCreateCmdArgs {
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
tenant_id: Option<TenantId>,
#[clap(long, help = "New timeline's ID")]
/// New timeline's ID, as a 32-byte hexadecimal string.
#[clap(long)]
timeline_id: Option<TimelineId>,
#[clap(long, help = "Human-readable alias for the new timeline")]
/// Human-readable alias for the new timeline.
#[clap(long)]
branch_name: String,
/// Postgres version.
#[arg(default_value = DEFAULT_PG_VERSION_NUM)]
#[clap(long, help = "Postgres version")]
#[clap(long)]
pg_version: PgMajorVersion,
}
/// Import a timeline from a basebackup directory.
#[derive(clap::Args)]
#[clap(about = "Import timeline from a basebackup directory")]
struct TimelineImportCmdArgs {
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
tenant_id: Option<TenantId>,
#[clap(long, help = "New timeline's ID")]
/// New timeline's ID, as a 32-byte hexadecimal string.
#[clap(long)]
timeline_id: TimelineId,
#[clap(long, help = "Human-readable alias for the new timeline")]
/// Human-readable alias for the new timeline.
#[clap(long)]
branch_name: String,
#[clap(long, help = "Basebackup tarfile to import")]
/// Basebackup tarfile to import.
#[clap(long)]
base_tarfile: PathBuf,
#[clap(long, help = "Lsn the basebackup starts at")]
/// LSN the basebackup starts at.
#[clap(long)]
base_lsn: Lsn,
#[clap(long, help = "Wal to add after base")]
/// WAL to add after base.
#[clap(long)]
wal_tarfile: Option<PathBuf>,
#[clap(long, help = "Lsn the basebackup ends at")]
/// LSN the basebackup ends at.
#[clap(long)]
end_lsn: Option<Lsn>,
/// Postgres version of the basebackup being imported.
#[arg(default_value = DEFAULT_PG_VERSION_NUM)]
#[clap(long, help = "Postgres version of the backup being imported")]
#[clap(long)]
pg_version: PgMajorVersion,
}
/// Manage pageservers.
#[derive(clap::Subcommand)]
#[clap(about = "Manage pageservers")]
enum PageserverCmd {
Status(PageserverStatusCmdArgs),
Start(PageserverStartCmdArgs),
@@ -339,223 +319,202 @@ enum PageserverCmd {
Restart(PageserverRestartCmdArgs),
}
/// Show status of a local pageserver.
#[derive(clap::Args)]
#[clap(about = "Show status of a local pageserver")]
struct PageserverStatusCmdArgs {
#[clap(long = "id", help = "pageserver id")]
/// Pageserver ID.
#[clap(long = "id")]
pageserver_id: Option<NodeId>,
}
/// Start local pageserver.
#[derive(clap::Args)]
#[clap(about = "Start local pageserver")]
struct PageserverStartCmdArgs {
#[clap(long = "id", help = "pageserver id")]
/// Pageserver ID.
#[clap(long = "id")]
pageserver_id: Option<NodeId>,
#[clap(short = 't', long, help = "timeout until we fail the command")]
/// Timeout until we fail the command.
#[clap(short = 't', long)]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
/// Stop local pageserver.
#[derive(clap::Args)]
#[clap(about = "Stop local pageserver")]
struct PageserverStopCmdArgs {
#[clap(long = "id", help = "pageserver id")]
/// Pageserver ID.
#[clap(long = "id")]
pageserver_id: Option<NodeId>,
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
/// If 'immediate', don't flush repository data at shutdown
#[clap(short = 'm')]
#[arg(value_enum, default_value = "fast")]
stop_mode: StopMode,
}
/// Restart local pageserver.
#[derive(clap::Args)]
#[clap(about = "Restart local pageserver")]
struct PageserverRestartCmdArgs {
#[clap(long = "id", help = "pageserver id")]
/// Pageserver ID.
#[clap(long = "id")]
pageserver_id: Option<NodeId>,
#[clap(short = 't', long, help = "timeout until we fail the command")]
/// Timeout until we fail the command.
#[clap(short = 't', long)]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
/// Manage storage controller.
#[derive(clap::Subcommand)]
#[clap(about = "Manage storage controller")]
enum StorageControllerCmd {
Start(StorageControllerStartCmdArgs),
Stop(StorageControllerStopCmdArgs),
}
/// Start storage controller.
#[derive(clap::Args)]
#[clap(about = "Start storage controller")]
struct StorageControllerStartCmdArgs {
#[clap(short = 't', long, help = "timeout until we fail the command")]
/// Timeout until we fail the command.
#[clap(short = 't', long)]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
#[clap(
long,
help = "Identifier used to distinguish storage controller instances"
)]
/// Identifier used to distinguish storage controller instances.
#[clap(long)]
#[arg(default_value_t = 1)]
instance_id: u8,
#[clap(
long,
help = "Base port for the storage controller instance idenfified by instance-id (defaults to pageserver cplane api)"
)]
/// Base port for the storage controller instance identified by instance-id (defaults to
/// pageserver cplane api).
#[clap(long)]
base_port: Option<u16>,
#[clap(
long,
help = "Whether the storage controller should handle pageserver-reported local disk loss events."
)]
/// Whether the storage controller should handle pageserver-reported local disk loss events.
#[clap(long)]
handle_ps_local_disk_loss: Option<bool>,
}
/// Stop storage controller.
#[derive(clap::Args)]
#[clap(about = "Stop storage controller")]
struct StorageControllerStopCmdArgs {
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
/// If 'immediate', don't flush repository data at shutdown
#[clap(short = 'm')]
#[arg(value_enum, default_value = "fast")]
stop_mode: StopMode,
#[clap(
long,
help = "Identifier used to distinguish storage controller instances"
)]
/// Identifier used to distinguish storage controller instances.
#[clap(long)]
#[arg(default_value_t = 1)]
instance_id: u8,
}
/// Manage storage broker.
#[derive(clap::Subcommand)]
#[clap(about = "Manage storage broker")]
enum StorageBrokerCmd {
Start(StorageBrokerStartCmdArgs),
Stop(StorageBrokerStopCmdArgs),
}
/// Start broker.
#[derive(clap::Args)]
#[clap(about = "Start broker")]
struct StorageBrokerStartCmdArgs {
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
/// Timeout until we fail the command.
#[clap(short = 't', long, default_value = "10s")]
start_timeout: humantime::Duration,
}
/// Stop broker.
#[derive(clap::Args)]
#[clap(about = "stop broker")]
struct StorageBrokerStopCmdArgs {
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
/// If 'immediate', don't flush repository data on shutdown.
#[clap(short = 'm')]
#[arg(value_enum, default_value = "fast")]
stop_mode: StopMode,
}
/// Manage safekeepers.
#[derive(clap::Subcommand)]
#[clap(about = "Manage safekeepers")]
enum SafekeeperCmd {
Start(SafekeeperStartCmdArgs),
Stop(SafekeeperStopCmdArgs),
Restart(SafekeeperRestartCmdArgs),
}
/// Manage object storage.
#[derive(clap::Subcommand)]
#[clap(about = "Manage object storage")]
enum EndpointStorageCmd {
Start(EndpointStorageStartCmd),
Stop(EndpointStorageStopCmd),
}
/// Start object storage.
#[derive(clap::Args)]
#[clap(about = "Start object storage")]
struct EndpointStorageStartCmd {
#[clap(short = 't', long, help = "timeout until we fail the command")]
/// Timeout until we fail the command.
#[clap(short = 't', long)]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
/// Stop object storage.
#[derive(clap::Args)]
#[clap(about = "Stop object storage")]
struct EndpointStorageStopCmd {
/// If 'immediate', don't flush repository data on shutdown.
#[clap(short = 'm')]
#[arg(value_enum, default_value = "fast")]
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
stop_mode: StopMode,
}
/// Start local safekeeper.
#[derive(clap::Args)]
#[clap(about = "Start local safekeeper")]
struct SafekeeperStartCmdArgs {
#[clap(help = "safekeeper id")]
/// Safekeeper ID.
#[arg(default_value_t = NodeId(1))]
id: NodeId,
#[clap(
short = 'e',
long = "safekeeper-extra-opt",
help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo"
)]
/// Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo.
#[clap(short = 'e', long = "safekeeper-extra-opt")]
extra_opt: Vec<String>,
#[clap(short = 't', long, help = "timeout until we fail the command")]
/// Timeout until we fail the command.
#[clap(short = 't', long)]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
/// Stop local safekeeper.
#[derive(clap::Args)]
#[clap(about = "Stop local safekeeper")]
struct SafekeeperStopCmdArgs {
#[clap(help = "safekeeper id")]
/// Safekeeper ID.
#[arg(default_value_t = NodeId(1))]
id: NodeId,
/// If 'immediate', don't flush repository data on shutdown.
#[arg(value_enum, default_value = "fast")]
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
#[clap(short = 'm')]
stop_mode: StopMode,
}
/// Restart local safekeeper.
#[derive(clap::Args)]
#[clap(about = "Restart local safekeeper")]
struct SafekeeperRestartCmdArgs {
#[clap(help = "safekeeper id")]
/// Safekeeper ID.
#[arg(default_value_t = NodeId(1))]
id: NodeId,
/// If 'immediate', don't flush repository data on shutdown.
#[arg(value_enum, default_value = "fast")]
#[clap(
short = 'm',
help = "If 'immediate', don't flush repository data at shutdown"
)]
#[clap(short = 'm')]
stop_mode: StopMode,
#[clap(
short = 'e',
long = "safekeeper-extra-opt",
help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo"
)]
/// Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo.
#[clap(short = 'e', long = "safekeeper-extra-opt")]
extra_opt: Vec<String>,
#[clap(short = 't', long, help = "timeout until we fail the command")]
/// Timeout until we fail the command.
#[clap(short = 't', long)]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
}
/// Manage Postgres instances.
#[derive(clap::Subcommand)]
#[clap(about = "Manage Postgres instances")]
enum EndpointCmd {
List(EndpointListCmdArgs),
Create(EndpointCreateCmdArgs),
@@ -567,33 +526,27 @@ enum EndpointCmd {
GenerateJwt(EndpointGenerateJwtCmdArgs),
}
/// List endpoints.
#[derive(clap::Args)]
#[clap(about = "List endpoints")]
struct EndpointListCmdArgs {
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
tenant_shard_id: Option<TenantShardId>,
}
/// Create a compute endpoint.
#[derive(clap::Args)]
#[clap(about = "Create a compute endpoint")]
struct EndpointCreateCmdArgs {
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long = "tenant-id")]
tenant_id: Option<TenantId>,
#[clap(help = "Postgres endpoint id")]
/// Postgres endpoint ID.
endpoint_id: Option<String>,
#[clap(long, help = "Name of the branch the endpoint will run on")]
/// Name of the branch the endpoint will run on.
#[clap(long)]
branch_name: Option<String>,
#[clap(
long,
help = "Specify Lsn on the timeline to start from. By default, end of the timeline would be used"
)]
/// Specify LSN on the timeline to start from. By default, end of the timeline would be used.
#[clap(long)]
lsn: Option<Lsn>,
#[clap(long)]
pg_port: Option<u16>,
@@ -604,16 +557,13 @@ struct EndpointCreateCmdArgs {
#[clap(long = "pageserver-id")]
endpoint_pageserver_id: Option<NodeId>,
#[clap(
long,
help = "Don't do basebackup, create endpoint directory with only config files",
action = clap::ArgAction::Set,
default_value_t = false
)]
/// Don't do basebackup, create endpoint directory with only config files.
#[clap(long, action = clap::ArgAction::Set, default_value_t = false)]
config_only: bool,
/// Postgres version.
#[arg(default_value = DEFAULT_PG_VERSION_NUM)]
#[clap(long, help = "Postgres version")]
#[clap(long)]
pg_version: PgMajorVersion,
/// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings.
@@ -624,170 +574,140 @@ struct EndpointCreateCmdArgs {
#[clap(long)]
grpc: bool,
#[clap(
long,
help = "If set, the node will be a hot replica on the specified timeline",
action = clap::ArgAction::Set,
default_value_t = false
)]
/// If set, the node will be a hot replica on the specified timeline.
#[clap(long, action = clap::ArgAction::Set, default_value_t = false)]
hot_standby: bool,
#[clap(long, help = "If set, will set up the catalog for neon_superuser")]
/// If set, will set up the catalog for neon_superuser.
#[clap(long)]
update_catalog: bool,
#[clap(
long,
help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
)]
/// Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but
/// useful for tests.
#[clap(long)]
allow_multiple: bool,
/// Only allow changing it on creation
#[clap(long, help = "Name of the privileged role for the endpoint")]
/// Name of the privileged role for the endpoint.
// Only allow changing it on creation.
#[clap(long)]
privileged_role_name: Option<String>,
}
/// Start Postgres. If the endpoint doesn't exist yet, it is created.
#[derive(clap::Args)]
#[clap(about = "Start postgres. If the endpoint doesn't exist yet, it is created.")]
struct EndpointStartCmdArgs {
#[clap(help = "Postgres endpoint id")]
/// Postgres endpoint ID.
endpoint_id: String,
/// Pageserver ID.
#[clap(long = "pageserver-id")]
endpoint_pageserver_id: Option<NodeId>,
#[clap(
long,
help = "Safekeepers membership generation to prefix neon.safekeepers with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations."
)]
/// Safekeepers membership generation to prefix neon.safekeepers with.
#[clap(long)]
safekeepers_generation: Option<u32>,
#[clap(
long,
help = "List of safekeepers endpoint will talk to. Normally neon_local chooses them on its own, but this option allows to override."
)]
/// List of safekeepers endpoint will talk to.
#[clap(long)]
safekeepers: Option<String>,
#[clap(
long,
help = "Configure the remote extensions storage proxy gateway URL to request for extensions.",
alias = "remote-ext-config"
)]
/// Configure the remote extensions storage proxy gateway URL to request for extensions.
#[clap(long, alias = "remote-ext-config")]
remote_ext_base_url: Option<String>,
#[clap(
long,
help = "If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`"
)]
/// If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`
#[clap(long)]
create_test_user: bool,
#[clap(
long,
help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests."
)]
/// Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but
/// useful for tests.
#[clap(long)]
allow_multiple: bool,
#[clap(short = 't', long, value_parser= humantime::parse_duration, help = "timeout until we fail the command")]
/// Timeout until we fail the command.
#[clap(short = 't', long, value_parser= humantime::parse_duration)]
#[arg(default_value = "90s")]
start_timeout: Duration,
#[clap(
long,
help = "Download LFC cache from endpoint storage on endpoint startup",
default_value = "false"
)]
/// Download LFC cache from endpoint storage on endpoint startup
#[clap(long, default_value = "false")]
autoprewarm: bool,
#[clap(long, help = "Upload LFC cache to endpoint storage periodically")]
/// Upload LFC cache to endpoint storage periodically
#[clap(long)]
offload_lfc_interval_seconds: Option<std::num::NonZeroU64>,
#[clap(
long,
help = "Run in development mode, skipping VM-specific operations like process termination",
action = clap::ArgAction::SetTrue
)]
/// Run in development mode, skipping VM-specific operations like process termination
#[clap(long, action = clap::ArgAction::SetTrue)]
dev: bool,
}
/// Reconfigure an endpoint.
#[derive(clap::Args)]
#[clap(about = "Reconfigure an endpoint")]
struct EndpointReconfigureCmdArgs {
#[clap(
long = "tenant-id",
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
/// Tenant id. Represented as a hexadecimal string 32 symbols length
#[clap(long = "tenant-id")]
tenant_id: Option<TenantId>,
#[clap(help = "Postgres endpoint id")]
/// Postgres endpoint ID.
endpoint_id: String,
/// Pageserver ID.
#[clap(long = "pageserver-id")]
endpoint_pageserver_id: Option<NodeId>,
#[clap(long)]
safekeepers: Option<String>,
}
/// Refresh the endpoint's configuration by forcing it reload it's spec
#[derive(clap::Args)]
#[clap(about = "Refresh the endpoint's configuration by forcing it reload it's spec")]
struct EndpointRefreshConfigurationArgs {
#[clap(help = "Postgres endpoint id")]
/// Postgres endpoint id
endpoint_id: String,
}
/// Stop an endpoint.
#[derive(clap::Args)]
#[clap(about = "Stop an endpoint")]
struct EndpointStopCmdArgs {
#[clap(help = "Postgres endpoint id")]
/// Postgres endpoint ID.
endpoint_id: String,
#[clap(
long,
help = "Also delete data directory (now optional, should be default in future)"
)]
/// Also delete data directory (now optional, should be default in future).
#[clap(long)]
destroy: bool,
#[clap(long, help = "Postgres shutdown mode")]
/// Postgres shutdown mode, passed to `pg_ctl -m <mode>`.
#[clap(long)]
#[clap(default_value = "fast")]
mode: EndpointTerminateMode,
}
/// Update the pageservers in the spec file of the compute endpoint
#[derive(clap::Args)]
#[clap(about = "Update the pageservers in the spec file of the compute endpoint")]
struct EndpointUpdatePageserversCmdArgs {
#[clap(help = "Postgres endpoint id")]
/// Postgres endpoint id
endpoint_id: String,
#[clap(short = 'p', long, help = "Specified pageserver id")]
/// Specified pageserver id
#[clap(short = 'p', long)]
pageserver_id: Option<NodeId>,
}
/// Generate a JWT for an endpoint.
#[derive(clap::Args)]
#[clap(about = "Generate a JWT for an endpoint")]
struct EndpointGenerateJwtCmdArgs {
#[clap(help = "Postgres endpoint id")]
/// Postgres endpoint ID.
endpoint_id: String,
#[clap(short = 's', long, help = "Scope to generate the JWT with", value_parser = ComputeClaimsScope::from_str)]
/// Scope to generate the JWT with.
#[clap(short = 's', long, value_parser = ComputeClaimsScope::from_str)]
scope: Option<ComputeClaimsScope>,
}
/// Manage neon_local branch name mappings.
#[derive(clap::Subcommand)]
#[clap(about = "Manage neon_local branch name mappings")]
enum MappingsCmd {
Map(MappingsMapCmdArgs),
}
/// Create new mapping which cannot exist already.
#[derive(clap::Args)]
#[clap(about = "Create new mapping which cannot exist already")]
struct MappingsMapCmdArgs {
#[clap(
long,
help = "Tenant id. Represented as a hexadecimal string 32 symbols length"
)]
/// Tenant ID, as a 32-byte hexadecimal string.
#[clap(long)]
tenant_id: TenantId,
#[clap(
long,
help = "Timeline id. Represented as a hexadecimal string 32 symbols length"
)]
/// Timeline ID, as a 32-byte hexadecimal string.
#[clap(long)]
timeline_id: TimelineId,
#[clap(long, help = "Branch name to give to the timeline")]
/// Branch name to give to the timeline.
#[clap(long)]
branch_name: String,
}

View File

@@ -303,6 +303,13 @@ enum Command {
#[arg(long, required = true, value_delimiter = ',')]
new_sk_set: Vec<NodeId>,
},
/// Abort ongoing safekeeper migration.
TimelineSafekeeperMigrateAbort {
#[arg(long)]
tenant_id: TenantId,
#[arg(long)]
timeline_id: TimelineId,
},
}
#[derive(Parser)]
@@ -1396,6 +1403,17 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
}
Command::TimelineSafekeeperMigrateAbort {
tenant_id,
timeline_id,
} => {
let path =
format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate_abort");
storcon_client
.dispatch::<(), ()>(Method::POST, path, None)
.await?;
}
}
Ok(())

View File

@@ -0,0 +1,246 @@
# Node deletion API improvement
Created on 2025-07-07
Implemented on _TBD_
## Summary
This RFC describes improvements to the storage controller API for gracefully deleting pageserver
nodes.
## Motivation
The basic node deletion API introduced in [#8226](https://github.com/neondatabase/neon/issues/8333)
has several limitations:
- Deleted nodes can re-add themselves if they restart (e.g., a flaky node that keeps restarting and
we cannot reach via SSH to stop the pageserver). This issue has been resolved by tombstone
mechanism in [#12036](https://github.com/neondatabase/neon/issues/12036)
- Process of node deletion is not graceful, i.e. it just imitates a node failure
In this context, "graceful" node deletion means that users do not experience any disruption or
negative effects, provided the system remains in a healthy state (i.e., the remaining pageservers
can handle the workload and all requirements are met). To achieve this, the system must perform
live migration of all tenant shards from the node being deleted while the node is still running
and continue processing all incoming requests. The node is removed only after all tenant shards
have been safely migrated.
Although live migrations can be achieved with the drain functionality, it leads to incorrect shard
placement, such as not matching availability zones. This results in unnecessary work to optimize
the placement that was just recently performed.
If we delete a node before its tenant shards are fully moved, the new node won't have all the
needed data (e.g. heatmaps) ready. This means user requests to the new node will be much slower at
first. If there are many tenant shards, this slowdown affects a huge amount of users.
Graceful node deletion is more complicated and can introduce new issues. It takes longer because
live migration of each tenant shard can last several minutes. Using non-blocking accessors may
also cause deletion to wait if other processes are holding inner state lock. It also gets trickier
because we need to handle other requests, like drain and fill, at the same time.
## Impacted components (e.g. pageserver, safekeeper, console, etc)
- storage controller
- pageserver (indirectly)
## Proposed implementation
### Tombstones
To resolve the problem of deleted nodes re-adding themselves, a tombstone mechanism was introduced
as part of the node stored information. Each node has a separate `NodeLifecycle` field with two
possible states: `Active` and `Deleted`. When node deletion completes, the database row is not
deleted but instead has its `NodeLifecycle` column switched to `Deleted`. Nodes with `Deleted`
lifecycle are treated as if the row is absent for most handlers, with several exceptions: reattach
and register functionality must be aware of tombstones. Additionally, new debug handlers are
available for listing and deleting tombstones via the `/debug/v1/tombstone` path.
### Gracefulness
The problem of making node deletion graceful is complex and involves several challenges:
- **Cancellable**: The operation must be cancellable to allow administrators to abort the process
if needed, e.g. if run by mistake.
- **Non-blocking**: We don't want to block deployment operations like draining/filling on the node
deletion process. We need clear policies for handling concurrent operations: what happens when a
drain/fill request arrives while deletion is in progress, and what happens when a delete request
arrives while drain/fill is in progress.
- **Persistent**: If the storage controller restarts during this long-running operation, we must
preserve progress and automatically resume the deletion process after the storage controller
restarts.
- **Migrated correctly**: We cannot simply use the existing drain mechanism for nodes scheduled
for deletion, as this would move shards to irrelevant locations. The drain process expects the
node to return, so it only moves shards to backup locations, not to their preferred AZs. It also
leaves secondary locations unmoved. This could result in unnecessary load on the storage
controller and inefficient resource utilization.
- **Force option**: Administrators need the ability to force immediate, non-graceful deletion when
time constraints or emergency situations require it, bypassing the normal graceful migration
process.
See below for a detailed breakdown of the proposed changes and mechanisms.
#### Node lifecycle
New `NodeLifecycle` enum and a matching database field with these values:
- `Active`: The normal state. All operations are allowed.
- `ScheduledForDeletion`: The node is marked to be deleted soon. Deletion may be in progress or
will happen later, but the node will eventually be removed. All operations are allowed.
- `Deleted`: The node is fully deleted. No operations are allowed, and the node cannot be brought
back. The only action left is to remove its record from the database. Any attempt to register a
node in this state will fail.
This state persists across storage controller restarts.
**State transition**
```
+--------------------+
+---| Active |<---------------------+
| +--------------------+ |
| ^ |
| start_node_delete | cancel_node_delete |
v | |
+----------------------------------+ |
| ScheduledForDeletion | |
+----------------------------------+ |
| |
| node_register |
| |
| delete_node (at the finish) |
| |
v |
+---------+ tombstone_delete +----------+
| Deleted |-------------------------------->| no row |
+---------+ +----------+
```
#### NodeSchedulingPolicy::Deleting
A `Deleting` variant to the `NodeSchedulingPolicy` enum. This means the deletion function is
running for the node right now. Only one node can have the `Deleting` policy at a time.
The `NodeSchedulingPolicy::Deleting` state is persisted in the database. However, after a storage
controller restart, any node previously marked as `Deleting` will have its scheduling policy reset
to `Pause`. The policy will only transition back to `Deleting` when the deletion operation is
actively started again, as triggered by the node's `NodeLifecycle::ScheduledForDeletion` state.
`NodeSchedulingPolicy` transition details:
1. When `node_delete` begins, set the policy to `NodeSchedulingPolicy::Deleting`.
2. If `node_delete` is cancelled (for example, due to a concurrent drain operation), revert the
policy to its previous value. The policy is persisted in storcon DB.
3. After `node_delete` completes, the final value of the scheduling policy is irrelevant, since
`NodeLifecycle::Deleted` prevents any further access to this field.
The deletion process cannot be initiated for nodes currently undergoing deployment-related
operations (`Draining`, `Filling`, or `PauseForRestart` policies). Deletion will only be triggered
once the node transitions to either the `Active` or `Pause` state.
#### OperationTracker
A replacement for `Option<OperationHandler> ongoing_operation`, the `OperationTracker` is a
dedicated service state object responsible for managing all long-running node operations (drain,
fill, delete) with robust concurrency control.
Key responsibilities:
- Orchestrates the execution of operations
- Supports cancellation of currently running operations
- Enforces operation constraints, e.g. allowing only single drain/fill operation at a time
- Persists deletion state, enabling recovery of pending deletions across restarts
- Ensures thread safety across concurrent requests
#### Attached tenant shard processing
When deleting a node, handle each attached tenant shard as follows:
1. Pick the best node to become the new attached (the candidate).
2. If the candidate already has this shard as a secondary:
- Create a new secondary for the shard on another suitable node.
Otherwise:
- Create a secondary for the shard on the candidate node.
3. Wait until all secondaries are ready and pre-warmed.
4. Promote the candidate's secondary to attached.
5. Remove the secondary from the node being deleted.
This process safely moves all attached shards before deleting the node.
#### Secondary tenant shard processing
When deleting a node, handle each secondary tenant shard as follows:
1. Choose the best node to become the new secondary.
2. Create a secondary for the shard on that node.
3. Wait until the new secondary is ready.
4. Remove the secondary from the node being deleted.
This ensures all secondary shards are safely moved before deleting the node.
### Reliability, failure modes and corner cases
In case of a storage controller failure and following restart, the system behavior depends on the
`NodeLifecycle` state:
- If `NodeLifecycle` is `Active`: No action is taken for this node.
- If `NodeLifecycle` is `Deleted`: The node will not be re-added.
- If `NodeLifecycle` is `ScheduledForDeletion`: A deletion background task will be launched for
this node.
In case of a pageserver node failure during deletion, the behavior depends on the `force` flag:
- If `force` is set: The node deletion will proceed regardless of the node's availability.
- If `force` is not set: The deletion will be retried a limited number of times. If the node
remains unavailable, the deletion process will pause and automatically resume when the node
becomes healthy again.
### Operations concurrency
The following sections describe the behavior when different types of requests arrive at the storage
controller and how they interact with ongoing operations.
#### Delete request
Handler: `PUT /control/v1/node/:node_id/delete`
1. If node lifecycle is `NodeLifecycle::ScheduledForDeletion`:
- Return `200 OK`: there is already an ongoing deletion request for this node
2. Update & persist lifecycle to `NodeLifecycle::ScheduledForDeletion`
3. Persist current scheduling policy
4. If there is no active operation (drain/fill/delete):
- Run deletion process for this node
#### Cancel delete request
Handler: `DELETE /control/v1/node/:node_id/delete`
1. If node lifecycle is not `NodeLifecycle::ScheduledForDeletion`:
- Return `404 Not Found`: there is no current deletion request for this node
2. If the active operation is deleting this node, cancel it
3. Update & persist lifecycle to `NodeLifecycle::Active`
4. Restore the last scheduling policy from persistence
#### Drain/fill request
1. If there are already ongoing drain/fill processes:
- Return `409 Conflict`: queueing of drain/fill processes is not supported
2. If there is an ongoing delete process:
- Cancel it and wait until it is cancelled
3. Run the drain/fill process
4. After the drain/fill process is cancelled or finished:
- Try to find another candidate to delete and run the deletion process for that node
#### Drain/fill cancel request
1. If the active operation is not the related process:
- Return `400 Bad Request`: cancellation request is incorrect, operations are not the same
2. Cancel the active operation
3. Try to find another candidate to delete and run the deletion process for that node
## Definition of Done
- [x] Fix flaky node scenario and introduce related debug handlers
- [ ] Node deletion intent is persistent - a node will be eventually deleted after a deletion
request regardless of draining/filling requests and restarts
- [ ] Node deletion can be graceful - deletion completes only after moving all tenant shards to
recommended locations
- [ ] Deploying does not break due to long deletions - drain/fill operations override deletion
process and deletion resumes after drain/fill completes
- [ ] `force` flag is implemented and provides fast, failure-tolerant node removal (e.g., when a
pageserver node does not respond)
- [ ] Legacy delete handler code is removed from storage_controller, test_runner, and storcon_cli

View File

@@ -1,10 +1,9 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use std::fmt::Display;
use chrono::{DateTime, Utc};
use jsonwebtoken::jwk::JwkSet;
use serde::{Deserialize, Serialize, Serializer};
use std::fmt::Display;
use crate::privilege::Privilege;
use crate::spec::{ComputeSpec, Database, ExtVersion, PgIdent, Role};
@@ -49,7 +48,7 @@ pub struct ExtensionInstallResponse {
/// Status of the LFC prewarm process. The same state machine is reused for
/// both autoprewarm (prewarm after compute/Postgres start using the previously
/// stored LFC state) and explicit prewarming via API.
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
#[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcPrewarmState {
/// Default value when compute boots up.
@@ -59,7 +58,14 @@ pub enum LfcPrewarmState {
Prewarming,
/// We found requested LFC state in the endpoint storage and
/// completed prewarming successfully.
Completed,
Completed {
total: i32,
prewarmed: i32,
skipped: i32,
state_download_time_ms: u32,
uncompress_time_ms: u32,
prewarm_time_ms: u32,
},
/// Unexpected error happened during prewarming. Note, `Not Found 404`
/// response from the endpoint storage is explicitly excluded here
/// because it can normally happen on the first compute start,
@@ -84,7 +90,7 @@ impl Display for LfcPrewarmState {
match self {
LfcPrewarmState::NotPrewarmed => f.write_str("NotPrewarmed"),
LfcPrewarmState::Prewarming => f.write_str("Prewarming"),
LfcPrewarmState::Completed => f.write_str("Completed"),
LfcPrewarmState::Completed { .. } => f.write_str("Completed"),
LfcPrewarmState::Skipped => f.write_str("Skipped"),
LfcPrewarmState::Failed { error } => write!(f, "Error({error})"),
LfcPrewarmState::Cancelled => f.write_str("Cancelled"),
@@ -92,26 +98,36 @@ impl Display for LfcPrewarmState {
}
}
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
#[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcOffloadState {
#[default]
NotOffloaded,
Offloading,
Completed,
Completed {
state_query_time_ms: u32,
compress_time_ms: u32,
state_upload_time_ms: u32,
},
Failed {
error: String,
},
/// LFC state was empty so it wasn't offloaded
Skipped,
}
#[derive(Serialize, Debug, Clone, PartialEq)]
#[derive(Serialize, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")]
/// Response of /promote
pub enum PromoteState {
NotPromoted,
Completed,
Failed { error: String },
Completed {
lsn_wait_time_ms: u32,
pg_promote_time_ms: u32,
reconfigure_time_ms: u32,
},
Failed {
error: String,
},
}
#[derive(Deserialize, Default, Debug)]

View File

@@ -1580,7 +1580,7 @@ impl TenantShard {
}
#[instrument(skip_all)]
pub(crate) async fn preload(
pub(crate) async fn preload(
self: &Arc<Self>,
remote_storage: &GenericRemoteStorage,
cancel: CancellationToken,

View File

@@ -88,7 +88,7 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::remote_timeline_client::RemoteTimelineClient;
use super::remote_timeline_client::index::{GcCompactionState, IndexPart};
use super::remote_timeline_client::index::{GcCompactionState, IndexPart, LayerFileMetadata};
use super::secondary::heatmap::HeatMapLayer;
use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
use super::tasks::log_compaction_error;
@@ -4205,6 +4205,12 @@ impl Timeline {
let desc: PersistentLayerDesc = hl.name.clone().into();
let layer = guard.try_get_from_key(&desc.key())?;
// Make sure the layer in the old heatmap is the same generation one as in the layer
// map otherwise we can in some edge case keep old obsolete layers in the heatmap.
if layer.metadata().generation != hl.metadata.generation {
return None;
}
if layer.visibility() == LayerVisibilityHint::Covered {
return None;
}
@@ -6474,6 +6480,7 @@ pub struct DeltaLayerTestDesc {
pub lsn_range: Range<Lsn>,
pub key_range: Range<Key>,
pub data: Vec<(Key, Lsn, Value)>,
pub resident: bool,
}
#[cfg(test)]
@@ -6491,12 +6498,22 @@ impl DeltaLayerTestDesc {
lsn_range,
key_range,
data,
// Default test code creates resident layers.
resident: true,
}
}
pub fn new_with_inferred_key_range(
lsn_range: Range<Lsn>,
data: Vec<(Key, Lsn, Value)>,
) -> Self {
Self::new_with_inferred_key_range_and_resident_state(lsn_range, data, true)
}
pub fn new_with_inferred_key_range_and_resident_state(
lsn_range: Range<Lsn>,
data: Vec<(Key, Lsn, Value)>,
resident: bool,
) -> Self {
let key_min = data.iter().map(|(key, _, _)| key).min().unwrap();
let key_max = data.iter().map(|(key, _, _)| key).max().unwrap();
@@ -6504,6 +6521,7 @@ impl DeltaLayerTestDesc {
key_range: (*key_min)..(key_max.next()),
lsn_range,
data,
resident
}
}
@@ -7505,6 +7523,30 @@ impl Timeline {
check_start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
if !deltas.resident {
// Don't need to bother creating an on-disk file, we just want the metadata for a non-resident layer.
let delta_layer = Layer::for_evicted(
self.conf,
self,
deltas.layer_name(),
LayerFileMetadata {
generation: self.generation,
shard: self.get_shard_index(),
file_size: 1024,
},
);
info!("force created non-resident delta layer {}", deltas.layer_name());
{
let mut guard = self.layers.write(LayerManagerLockHolder::Testing).await;
guard
.open_mut()
.unwrap()
.force_insert_optionally_resident_layer(delta_layer);
}
return Ok(());
}
let last_record_lsn = self.get_last_record_lsn();
deltas
.data
@@ -8263,6 +8305,148 @@ mod tests {
));
}
#[tokio::test]
async fn test_heatmap_generation_removes_layers_from_old_generation() {
use std::time::SystemTime;
use utils::generation::Generation;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::secondary::heatmap::{HeatMapLayer, HeatMapTimeline as HeatMapTimelineStruct};
let harness = TenantHarness::create("heatmaheatmap_genereation_removes_layers_from_old_generationp_generation").await.unwrap();
// Create your existing layer descriptions
let covered_delta = DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x10)..Lsn(0x20),
vec![(
Key::from_hex("620000000033333333444444445500000000").unwrap(),
Lsn(0x11),
Value::Image(test_img("foo")),
)],
);
// This visible layer is non-resident on disk. This is important to reproduce the failure as
// a resident file will take priority over the previous heatmap even without the fix for
// this issue.
let visible_delta = DeltaLayerTestDesc::new_with_inferred_key_range_and_resident_state(
Lsn(0x10)..Lsn(0x20),
vec![(
Key::from_hex("720000000033333333444444445500000000").unwrap(),
Lsn(0x11),
Value::Image(test_img("foo")),
)],
false, // Non-resident
);
let l0_delta = DeltaLayerTestDesc::new(
Lsn(0x20)..Lsn(0x30),
Key::from_hex("000000000000000000000000000000000000").unwrap()
..Key::from_hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF").unwrap(),
vec![(
Key::from_hex("720000000033333333444444445500000000").unwrap(),
Lsn(0x25),
Value::Image(test_img("foo")),
)],
);
let delta_layers = vec![
covered_delta.clone(),
visible_delta.clone(),
l0_delta.clone(),
];
let image_layer = (
Lsn(0x40),
vec![(
Key::from_hex("620000000033333333444444445500000000").unwrap(),
test_img("bar"),
)],
);
let image_layers = vec![image_layer];
let (tenant, ctx) = harness.load().await;
// Create timeline with current generation (0xdeadbeef by default)
let timeline = tenant
.create_test_timeline_with_layers(
TimelineId::generate(),
Lsn(0x10),
PgMajorVersion::PG14,
&ctx,
Vec::new(), // in-memory layers
delta_layers,
image_layers,
Lsn(0x100),
)
.await
.unwrap();
// Now create a previous heatmap with the visible_delta layer from an older generation
let current_layer_metadata = LayerFileMetadata {
generation: timeline.generation,
shard: timeline.get_shard_index(),
file_size: 1024,
};
let old_generation = Generation::new(0x12345678); // Older than 0xdeadbeef
let old_layer_metadata = LayerFileMetadata {
generation: old_generation,
shard: timeline.get_shard_index(),
file_size: 1024,
};
// Create heatmap layers that reference the same keys but with old generation
let prev_heatmap_layers = vec![
HeatMapLayer::new(
covered_delta.layer_name(),
current_layer_metadata.clone(),
SystemTime::now(),
false, // not cold
),
HeatMapLayer::new(
visible_delta.layer_name(),
// Visible delta layer is from an older generation in heatmap
old_layer_metadata.clone(),
SystemTime::now(),
false, // not cold
),
HeatMapLayer::new(
l0_delta.layer_name(),
current_layer_metadata.clone(),
SystemTime::now(),
false, // not cold
),
];
// Create the previous heatmap with old generation layers
let prev_heatmap = HeatMapTimelineStruct::new(timeline.timeline_id, prev_heatmap_layers);
// Set the previous heatmap on the timeline
timeline
.previous_heatmap
.store(Some(Arc::new(PreviousHeatmap::Active {
heatmap: prev_heatmap,
read_at: std::time::Instant::now(),
end_lsn: None,
})));
// Layer visibility is an input to heatmap generation, so refresh it first
timeline.update_layer_visibility().await.unwrap();
// Generate a new heatmap - this should filter out the old generation layers
let heatmap = timeline
.generate_heatmap()
.await
.expect("Infallible while timeline is not shut down");
assert_eq!(heatmap.timeline_id, timeline.timeline_id);
// Verify that layers exist but they should be the current generation ones,
// not the old generation ones from previous_heatmap
for layer in heatmap.all_layers() {
assert_eq!(
layer.metadata.generation,
timeline.generation,
"Heatmap should only contain current generation layers, not old ones"
);
}
}
#[tokio::test]
async fn two_layer_eviction_attempts_at_the_same_time() {
let harness = TenantHarness::create("two_layer_eviction_attempts_at_the_same_time")

View File

@@ -129,6 +129,16 @@ pub(super) fn reconcile(
// Construct Decisions for layers that are found locally, if they're in remote metadata. Otherwise
// construct DismissedLayers to get rid of them.
for (layer_name, local_metadata) in local_layers {
// FIXME: This should probably take generation into account. Currently it's possible to have
// an old generation file on disk while a newer one with same name is in index (because
// primary just split shard) and we miss that here. We are saved by the check below because
// the file size is very likely to be different (and if it isn't then the file contents will
// probably be the same anyway in case of shard split), but it's confusing that this logic
// doesn't account for name collisions from older generations. Ideally, we should consider a
// local file from an older generation than the one in the index to be a different file and
// return `DismissedLayer::LocalOnly` if generations don't match. Right now though,
// layer_name has the generation part stripped so we'd need to re-parse the generation from
// the file name here or back in scan_timeline_dir and add it to LocalLayerFileMetadata.
let Some(remote_metadata) = index_part.layer_metadata.get(&layer_name) else {
result.push((layer_name, Err(DismissedLayer::LocalOnly(local_metadata))));
continue;

View File

@@ -645,8 +645,13 @@ impl OpenLayerManager {
#[cfg(test)]
pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
self.force_insert_optionally_resident_layer(layer.as_ref().clone());
}
#[cfg(test)]
pub(crate) fn force_insert_optionally_resident_layer(&mut self, layer: Layer) {
let mut updates = self.layer_map.batch_update();
Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
updates.flush()
}

View File

@@ -458,7 +458,7 @@ pub(crate) enum LocalProxyConnError {
impl ReportableError for HttpConnError {
fn get_error_kind(&self) -> ErrorKind {
match self {
HttpConnError::ConnectError(_) => ErrorKind::Compute,
HttpConnError::ConnectError(e) => e.get_error_kind(),
HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute,
HttpConnError::PostgresConnectionError(p) => match p.as_db_error() {
// user provided a wrong database name

View File

@@ -5,12 +5,17 @@ use std::sync::Arc;
use bytes::Bytes;
use http::Method;
use http::header::{AUTHORIZATION, CONTENT_TYPE, HOST};
use http::header::{
ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_MAX_AGE, ACCESS_CONTROL_REQUEST_HEADERS, ALLOW,
AUTHORIZATION, CONTENT_TYPE, HOST, ORIGIN,
};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use http_body_util::{BodyExt, Empty, Full};
use http_utils::error::ApiError;
use hyper::body::Incoming;
use hyper::http::{HeaderName, HeaderValue};
use hyper::http::response::Builder;
use hyper::http::{HeaderMap, HeaderName, HeaderValue};
use hyper::{Request, Response, StatusCode};
use indexmap::IndexMap;
use moka::sync::Cache;
@@ -67,6 +72,15 @@ use crate::util::deserialize_json_string;
static EMPTY_JSON_SCHEMA: &str = r#"{"schemas":[]}"#;
const INTROSPECTION_SQL: &str = POSTGRESQL_INTROSPECTION_SQL;
const HEADER_VALUE_ALLOW_ALL_ORIGINS: HeaderValue = HeaderValue::from_static("*");
// CORS headers values
const ACCESS_CONTROL_ALLOW_METHODS_VALUE: HeaderValue =
HeaderValue::from_static("GET, POST, PATCH, PUT, DELETE, OPTIONS");
const ACCESS_CONTROL_MAX_AGE_VALUE: HeaderValue = HeaderValue::from_static("86400");
const ACCESS_CONTROL_EXPOSE_HEADERS_VALUE: HeaderValue = HeaderValue::from_static(
"Content-Encoding, Content-Location, Content-Range, Content-Type, Date, Location, Server, Transfer-Encoding, Range-Unit",
);
const ACCESS_CONTROL_ALLOW_HEADERS_VALUE: HeaderValue = HeaderValue::from_static("Authorization");
// A wrapper around the DbSchema that allows for self-referencing
#[self_referencing]
@@ -137,6 +151,8 @@ pub struct ApiConfig {
pub role_claim_key: String,
#[serde(default, deserialize_with = "deserialize_comma_separated_option")]
pub db_extra_search_path: Option<Vec<String>>,
#[serde(default, deserialize_with = "deserialize_comma_separated_option")]
pub server_cors_allowed_origins: Option<Vec<String>>,
}
// The DbSchemaCache is a cache of the ApiConfig and DbSchemaOwned for each endpoint
@@ -165,7 +181,13 @@ impl DbSchemaCache {
}
}
pub async fn get_cached_or_remote(
pub fn get_cached(
&self,
endpoint_id: &EndpointCacheKey,
) -> Option<Arc<(ApiConfig, DbSchemaOwned)>> {
count_cache_outcome(CacheKind::Schema, self.0.get(endpoint_id))
}
pub async fn get_remote(
&self,
endpoint_id: &EndpointCacheKey,
auth_header: &HeaderValue,
@@ -174,47 +196,42 @@ impl DbSchemaCache {
ctx: &RequestContext,
config: &'static ProxyConfig,
) -> Result<Arc<(ApiConfig, DbSchemaOwned)>, RestError> {
let cache_result = count_cache_outcome(CacheKind::Schema, self.0.get(endpoint_id));
match cache_result {
Some(v) => Ok(v),
None => {
info!("db_schema cache miss for endpoint: {:?}", endpoint_id);
let remote_value = self
.get_remote(auth_header, connection_string, client, ctx, config)
.await;
let (api_config, schema_owned) = match remote_value {
Ok((api_config, schema_owned)) => (api_config, schema_owned),
Err(e @ RestError::SchemaTooLarge) => {
// for the case where the schema is too large, we cache an empty dummy value
// all the other requests will fail without triggering the introspection query
let schema_owned = serde_json::from_str::<DbSchemaOwned>(EMPTY_JSON_SCHEMA)
.map_err(|e| JsonDeserialize { source: e })?;
info!("db_schema cache miss for endpoint: {:?}", endpoint_id);
let remote_value = self
.internal_get_remote(auth_header, connection_string, client, ctx, config)
.await;
let (api_config, schema_owned) = match remote_value {
Ok((api_config, schema_owned)) => (api_config, schema_owned),
Err(e @ RestError::SchemaTooLarge) => {
// for the case where the schema is too large, we cache an empty dummy value
// all the other requests will fail without triggering the introspection query
let schema_owned = serde_json::from_str::<DbSchemaOwned>(EMPTY_JSON_SCHEMA)
.map_err(|e| JsonDeserialize { source: e })?;
let api_config = ApiConfig {
db_schemas: vec![],
db_anon_role: None,
db_max_rows: None,
db_allowed_select_functions: vec![],
role_claim_key: String::new(),
db_extra_search_path: None,
};
let value = Arc::new((api_config, schema_owned));
count_cache_insert(CacheKind::Schema);
self.0.insert(endpoint_id.clone(), value);
return Err(e);
}
Err(e) => {
return Err(e);
}
let api_config = ApiConfig {
db_schemas: vec![],
db_anon_role: None,
db_max_rows: None,
db_allowed_select_functions: vec![],
role_claim_key: String::new(),
db_extra_search_path: None,
server_cors_allowed_origins: None,
};
let value = Arc::new((api_config, schema_owned));
count_cache_insert(CacheKind::Schema);
self.0.insert(endpoint_id.clone(), value.clone());
Ok(value)
self.0.insert(endpoint_id.clone(), value);
return Err(e);
}
}
Err(e) => {
return Err(e);
}
};
let value = Arc::new((api_config, schema_owned));
count_cache_insert(CacheKind::Schema);
self.0.insert(endpoint_id.clone(), value.clone());
Ok(value)
}
pub async fn get_remote(
async fn internal_get_remote(
&self,
auth_header: &HeaderValue,
connection_string: &str,
@@ -531,7 +548,7 @@ pub(crate) async fn handle(
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
let result = handle_inner(cancel, config, &ctx, request, backend).await;
let mut response = match result {
let response = match result {
Ok(r) => {
ctx.set_success();
@@ -640,9 +657,6 @@ pub(crate) async fn handle(
}
};
response
.headers_mut()
.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
Ok(response)
}
@@ -722,6 +736,37 @@ async fn handle_inner(
}
}
fn apply_common_cors_headers(
response: &mut Builder,
request_headers: &HeaderMap,
allowed_origins: Option<&Vec<String>>,
) {
let request_origin = request_headers
.get(ORIGIN)
.map(|v| v.to_str().unwrap_or(""));
let response_allow_origin = match (request_origin, allowed_origins) {
(Some(or), Some(allowed_origins)) => {
if allowed_origins.iter().any(|o| o == or) {
Some(HeaderValue::from_str(or).unwrap_or(HEADER_VALUE_ALLOW_ALL_ORIGINS))
} else {
None
}
}
(Some(_), None) => Some(HEADER_VALUE_ALLOW_ALL_ORIGINS),
_ => None,
};
if let Some(h) = response.headers_mut() {
h.insert(
ACCESS_CONTROL_EXPOSE_HEADERS,
ACCESS_CONTROL_EXPOSE_HEADERS_VALUE,
);
if let Some(origin) = response_allow_origin {
h.insert(ACCESS_CONTROL_ALLOW_ORIGIN, origin);
}
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_rest_inner(
config: &'static ProxyConfig,
@@ -733,12 +778,6 @@ async fn handle_rest_inner(
jwt: String,
backend: Arc<PoolingBackend>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, RestError> {
// validate the jwt token
let jwt_parsed = backend
.authenticate_with_jwt(ctx, &conn_info.user_info, jwt)
.await
.map_err(HttpConnError::from)?;
let db_schema_cache =
config
.rest_config
@@ -754,28 +793,83 @@ async fn handle_rest_inner(
message: "Failed to get endpoint cache key".to_string(),
}))?;
let mut client = backend.connect_to_local_proxy(ctx, conn_info).await?;
let (parts, originial_body) = request.into_parts();
// try and get the cached entry for this endpoint
// it contains the api config and the introspected db schema
let cached_entry = db_schema_cache.get_cached(&endpoint_cache_key);
let allowed_origins = cached_entry
.as_ref()
.and_then(|arc| arc.0.server_cors_allowed_origins.as_ref());
let mut response = Response::builder();
apply_common_cors_headers(&mut response, &parts.headers, allowed_origins);
// handle the OPTIONS request
if parts.method == Method::OPTIONS {
let allowed_headers = parts
.headers
.get(ACCESS_CONTROL_REQUEST_HEADERS)
.and_then(|a| a.to_str().ok())
.filter(|v| !v.is_empty())
.map_or_else(
|| "Authorization".to_string(),
|v| format!("{v}, Authorization"),
);
return response
.status(StatusCode::OK)
.header(
ACCESS_CONTROL_ALLOW_METHODS,
ACCESS_CONTROL_ALLOW_METHODS_VALUE,
)
.header(ACCESS_CONTROL_MAX_AGE, ACCESS_CONTROL_MAX_AGE_VALUE)
.header(
ACCESS_CONTROL_ALLOW_HEADERS,
HeaderValue::from_str(&allowed_headers)
.unwrap_or(ACCESS_CONTROL_ALLOW_HEADERS_VALUE),
)
.header(ALLOW, ACCESS_CONTROL_ALLOW_METHODS_VALUE)
.body(Empty::new().map_err(|x| match x {}).boxed())
.map_err(|e| {
RestError::SubzeroCore(InternalError {
message: e.to_string(),
})
});
}
// validate the jwt token
let jwt_parsed = backend
.authenticate_with_jwt(ctx, &conn_info.user_info, jwt)
.await
.map_err(HttpConnError::from)?;
let auth_header = parts
.headers
.get(AUTHORIZATION)
.ok_or(RestError::SubzeroCore(InternalError {
message: "Authorization header is required".to_string(),
}))?;
let mut client = backend.connect_to_local_proxy(ctx, conn_info).await?;
let entry = db_schema_cache
.get_cached_or_remote(
&endpoint_cache_key,
auth_header,
connection_string,
&mut client,
ctx,
config,
)
.await?;
let entry = match cached_entry {
Some(e) => e,
None => {
// if not cached, get the remote entry (will run the introspection query)
db_schema_cache
.get_remote(
&endpoint_cache_key,
auth_header,
connection_string,
&mut client,
ctx,
config,
)
.await?
}
};
let (api_config, db_schema_owned) = entry.as_ref();
let db_schema = db_schema_owned.borrow_schema();
let db_schemas = &api_config.db_schemas; // list of schemas available for the api
@@ -999,8 +1093,8 @@ async fn handle_rest_inner(
let _metrics = client.metrics(ctx); // FIXME: is everything in the context set correctly?
// send the request to the local proxy
let response = make_raw_local_proxy_request(&mut client, headers, req_body).await?;
let (parts, body) = response.into_parts();
let proxy_response = make_raw_local_proxy_request(&mut client, headers, req_body).await?;
let (response_parts, body) = proxy_response.into_parts();
let max_response = config.http_config.max_response_size_bytes;
let bytes = read_body_with_limit(body, max_response)
@@ -1009,7 +1103,7 @@ async fn handle_rest_inner(
// if the response status is greater than 399, then it is an error
// FIXME: check if there are other error codes or shapes of the response
if parts.status.as_u16() > 399 {
if response_parts.status.as_u16() > 399 {
// turn this postgres error from the json into PostgresError
let postgres_error = serde_json::from_slice(&bytes)
.map_err(|e| RestError::SubzeroCore(JsonDeserialize { source: e }))?;
@@ -1175,7 +1269,7 @@ async fn handle_rest_inner(
.boxed();
// build the response
let mut response = Response::builder()
response = response
.status(StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR))
.header(CONTENT_TYPE, http_content_type);

View File

@@ -612,19 +612,25 @@ pub async fn handle_request(
}
}
let max_term = statuses
.iter()
.map(|(status, _)| status.acceptor_state.term)
.max()
.unwrap();
// Find the most advanced safekeeper
let (status, i) = statuses
.into_iter()
.max_by_key(|(status, _)| {
(
status.acceptor_state.epoch,
status.flush_lsn,
/* BEGIN_HADRON */
// We need to pull from the SK with the highest term.
// This is because another compute may come online and vote the same highest term again on the other two SKs.
// Then, there will be 2 computes running on the same term.
status.acceptor_state.term,
/* END_HADRON */
status.flush_lsn,
status.commit_lsn,
)
})
@@ -634,6 +640,22 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
// TODO(diko): This is hadron only check to make sure that we pull the timeline
// from the safekeeper with the highest term during timeline restore.
// We could avoid returning the error by calling bump_term after pull_timeline.
// However, this is not a big deal because we retry the pull_timeline requests.
// The check should be removed together with removing custom hadron logic for
// safekeeper restore.
if wait_for_peer_timeline_status && status.acceptor_state.term != max_term {
return Err(ApiError::PreconditionFailed(
format!(
"choosen safekeeper {} has term {}, but the most advanced term is {}",
safekeeper_host, status.acceptor_state.term, max_term
)
.into(),
));
}
match pull_timeline(
status,
safekeeper_host,

View File

@@ -195,12 +195,14 @@ impl StateSK {
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
let result = self.state_mut().membership_switch(to).await?;
let flush_lsn = self.flush_lsn();
let last_log_term = self.state().acceptor_state.get_last_log_term(flush_lsn);
Ok(TimelineMembershipSwitchResponse {
previous_conf: result.previous_conf,
current_conf: result.current_conf,
last_log_term: self.state().acceptor_state.term,
flush_lsn: self.flush_lsn(),
last_log_term,
flush_lsn,
})
}

View File

@@ -644,6 +644,7 @@ async fn handle_tenant_timeline_safekeeper_migrate(
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
// TODO(diko): it's not PS operation, there should be a different permission scope.
check_permissions(&req, Scope::PageServerApi)?;
maybe_rate_limit(&req, tenant_id).await;
@@ -665,6 +666,23 @@ async fn handle_tenant_timeline_safekeeper_migrate(
json_response(StatusCode::OK, ())
}
async fn handle_tenant_timeline_safekeeper_migrate_abort(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
// TODO(diko): it's not PS operation, there should be a different permission scope.
check_permissions(&req, Scope::PageServerApi)?;
maybe_rate_limit(&req, tenant_id).await;
service
.tenant_timeline_safekeeper_migrate_abort(tenant_id, timeline_id)
.await?;
json_response(StatusCode::OK, ())
}
async fn handle_tenant_timeline_lsn_lease(
service: Arc<Service>,
req: Request<Body>,
@@ -2611,6 +2629,16 @@ pub fn make_router(
)
},
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/safekeeper_migrate_abort",
|r| {
tenant_service_handler(
r,
handle_tenant_timeline_safekeeper_migrate_abort,
RequestName("v1_tenant_timeline_safekeeper_migrate_abort"),
)
},
)
// LSN lease passthrough to all shards
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",

View File

@@ -24,12 +24,12 @@ use pageserver_api::controller_api::{
};
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::PgVersionId;
use safekeeper_api::Term;
use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
use safekeeper_api::models::{
PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
TimelineMembershipSwitchResponse,
};
use safekeeper_api::{INITIAL_TERM, Term};
use safekeeper_client::mgmt_api;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -1230,10 +1230,7 @@ impl Service {
}
// It it is the same new_sk_set, we can continue the migration (retry).
} else {
let prev_finished = timeline.cplane_notified_generation == timeline.generation
&& timeline.sk_set_notified_generation == timeline.generation;
if !prev_finished {
if !is_migration_finished(&timeline) {
// The previous migration is committed, but the finish step failed.
// Safekeepers/cplane might not know about the last membership configuration.
// Retry the finish step to ensure smooth migration.
@@ -1298,13 +1295,7 @@ impl Service {
)
.await?;
let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
for res in results.into_iter().flatten() {
let sk_position = (res.last_log_term, res.flush_lsn);
if sync_position < sk_position {
sync_position = sk_position;
}
}
let sync_position = Self::get_sync_position(&results)?;
tracing::info!(
%generation,
@@ -1551,6 +1542,8 @@ impl Service {
timeline_id: TimelineId,
timeline: &TimelinePersistence,
) -> Result<(), ApiError> {
tracing::info!(generation=?timeline.generation, sk_set=?timeline.sk_set, new_sk_set=?timeline.new_sk_set, "retrying finish safekeeper migration");
if timeline.new_sk_set.is_some() {
// Logical error, should never happen.
return Err(ApiError::InternalServerError(anyhow::anyhow!(
@@ -1598,4 +1591,152 @@ impl Service {
Ok(())
}
/// Get membership switch responses from all safekeepers and return the sync position.
///
/// Sync position is a position equal or greater than the commit position.
/// It is guaranteed that all WAL entries with (last_log_term, flush_lsn)
/// greater than the sync position are not committed (= not on a quorum).
///
/// Returns error if there is no quorum of successful responses.
fn get_sync_position(
responses: &[mgmt_api::Result<TimelineMembershipSwitchResponse>],
) -> Result<(Term, Lsn), ApiError> {
let quorum_size = responses.len() / 2 + 1;
let mut wal_positions = responses
.iter()
.flatten()
.map(|res| (res.last_log_term, res.flush_lsn))
.collect::<Vec<_>>();
// Should be already checked if the responses are from tenant_timeline_set_membership_quorum.
if wal_positions.len() < quorum_size {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"not enough successful responses to get sync position: {}/{}",
wal_positions.len(),
quorum_size,
)));
}
wal_positions.sort();
Ok(wal_positions[quorum_size - 1])
}
/// Abort ongoing safekeeper migration.
pub(crate) async fn tenant_timeline_safekeeper_migrate_abort(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<(), ApiError> {
// TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks.
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineSafekeeperMigrate,
)
.await;
// Fetch current timeline configuration from the configuration storage.
let timeline = self
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(timeline) = timeline else {
return Err(ApiError::NotFound(
anyhow::anyhow!(
"timeline {tenant_id}/{timeline_id} doesn't exist in timelines table"
)
.into(),
));
};
let mut generation = SafekeeperGeneration::new(timeline.generation as u32);
let Some(new_sk_set) = &timeline.new_sk_set else {
// No new_sk_set -> no active migration that we can abort.
tracing::info!("timeline has no active migration");
if !is_migration_finished(&timeline) {
// The last migration is committed, but the finish step failed.
// Safekeepers/cplane might not know about the last membership configuration.
// Retry the finish step to make the timeline state clean.
self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline)
.await?;
}
return Ok(());
};
tracing::info!(sk_set=?timeline.sk_set, ?new_sk_set, ?generation, "aborting timeline migration");
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
let new_safekeepers = self.get_safekeepers(new_sk_set)?;
let cur_sk_member_set =
Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
// Increment current generation and remove new_sk_set from the timeline to abort the migration.
generation = generation.next();
let mconf = membership::Configuration {
generation,
members: cur_sk_member_set,
new_members: None,
};
// Exclude safekeepers which were added during the current migration.
let cur_ids: HashSet<NodeId> = cur_safekeepers.iter().map(|sk| sk.get_id()).collect();
let exclude_safekeepers = new_safekeepers
.into_iter()
.filter(|sk| !cur_ids.contains(&sk.get_id()))
.collect::<Vec<_>>();
let exclude_requests = exclude_safekeepers
.iter()
.map(|sk| TimelinePendingOpPersistence {
sk_id: sk.skp.id,
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: generation.into_inner() as i32,
op_kind: SafekeeperTimelineOpKind::Exclude,
})
.collect::<Vec<_>>();
let cur_sk_set = cur_safekeepers
.iter()
.map(|sk| sk.get_id())
.collect::<Vec<_>>();
// Persist new mconf and exclude requests.
self.persistence
.update_timeline_membership(
tenant_id,
timeline_id,
generation,
&cur_sk_set,
None,
&exclude_requests,
)
.await?;
// At this point we have already commited the abort, but still need to notify
// cplane/safekeepers with the new mconf. That's what finish_safekeeper_migration does.
self.finish_safekeeper_migration(
tenant_id,
timeline_id,
&cur_safekeepers,
&mconf,
&exclude_safekeepers,
)
.await?;
Ok(())
}
}
fn is_migration_finished(timeline: &TimelinePersistence) -> bool {
timeline.cplane_notified_generation == timeline.generation
&& timeline.sk_set_notified_generation == timeline.generation
}

View File

@@ -2313,6 +2313,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
timeline_id: TimelineId,
new_sk_set: list[int],
):
log.info(f"migrate_safekeepers({tenant_id}, {timeline_id}, {new_sk_set})")
response = self.request(
"POST",
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate",
@@ -2322,6 +2323,19 @@ class NeonStorageController(MetricsGetter, LogUtils):
response.raise_for_status()
log.info(f"migrate_safekeepers success: {response.json()}")
def abort_safekeeper_migration(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
):
response = self.request(
"POST",
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate_abort",
headers=self.headers(TokenScope.PAGE_SERVER_API),
)
response.raise_for_status()
log.info(f"abort_safekeeper_migration success: {response.json()}")
def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]:
"""
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr": str, "listen_http_port": int}

View File

@@ -145,6 +145,7 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
stop_and_check_lsn(secondary, None)
if method == PromoteMethod.COMPUTE_CTL:
log.info("Restarting primary to check new config")
secondary.stop()
# In production, compute ultimately receives new compute spec from cplane.
secondary.respec(mode="Primary")

View File

@@ -286,3 +286,265 @@ def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder):
assert re.match(r".*Timeline .* deleted.*", exc.value.response.text)
# The timeline should remain deleted.
expect_deleted(second_sk)
def test_safekeeper_migration_stale_timeline(neon_env_builder: NeonEnvBuilder):
"""
Test that safekeeper migration handles stale timeline correctly by migrating to
a safekeeper with a stale timeline.
1. Check that we are waiting for the stale timeline to catch up with the commit lsn.
The migration might fail if there is no compute to advance the WAL.
2. Check that we rely on last_log_term (and not the current term) when waiting for the
sync_position on step 7.
3. Check that migration succeeds if the compute is running.
"""
neon_env_builder.num_safekeepers = 2
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 1,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
env.storage_controller.allowed_errors.append(".*not enough successful .* to reach quorum.*")
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
active_sk = env.get_safekeeper(mconf["sk_set"][0])
other_sk = [sk for sk in env.safekeepers if sk.id != active_sk.id][0]
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
ep.start(safekeeper_generation=1, safekeepers=[active_sk.id])
ep.safe_psql("CREATE TABLE t(a int)")
ep.safe_psql("INSERT INTO t VALUES (0)")
# Pull the timeline to other_sk, so other_sk now has a "stale" timeline on it.
other_sk.pull_timeline([active_sk], env.initial_tenant, env.initial_timeline)
# Advance the WAL on active_sk.
ep.safe_psql("INSERT INTO t VALUES (1)")
# The test is more tricky if we have the same last_log_term but different term/flush_lsn.
# Stop the active_sk during the endpoint shutdown because otherwise compute_ctl runs
# sync_safekeepers and advances last_log_term on active_sk.
active_sk.stop()
ep.stop(mode="immediate")
active_sk.start()
active_sk_status = active_sk.http_client().timeline_status(
env.initial_tenant, env.initial_timeline
)
other_sk_status = other_sk.http_client().timeline_status(
env.initial_tenant, env.initial_timeline
)
# other_sk should have the same last_log_term, but a stale flush_lsn.
assert active_sk_status.last_log_term == other_sk_status.last_log_term
assert active_sk_status.flush_lsn > other_sk_status.flush_lsn
commit_lsn = active_sk_status.flush_lsn
# Bump the term on other_sk to make it higher than active_sk.
# This is to make sure we don't use current term instead of last_log_term in the algorithm.
other_sk.http_client().term_bump(
env.initial_tenant, env.initial_timeline, active_sk_status.term + 100
)
# TODO(diko): now it fails because the timeline on other_sk is stale and there is no compute
# to catch up it with active_sk. It might be fixed in https://databricks.atlassian.net/browse/LKB-946
# if we delete stale timelines before starting the migration.
# But the rest of the test is still valid: we should not lose committed WAL after the migration.
with pytest.raises(
StorageControllerApiException, match="not enough successful .* to reach quorum"
):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [other_sk.id]
)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["new_sk_set"] == [other_sk.id]
assert mconf["sk_set"] == [active_sk.id]
assert mconf["generation"] == 2
# Start the endpoint, so it advances the WAL on other_sk.
ep.start(safekeeper_generation=2, safekeepers=[active_sk.id, other_sk.id])
# Now the migration should succeed.
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [other_sk.id]
)
# Check that we didn't lose committed WAL.
assert (
other_sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline).flush_lsn
>= commit_lsn
)
assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]
def test_pull_from_most_advanced_sk(neon_env_builder: NeonEnvBuilder):
"""
Test that we pull the timeline from the most advanced safekeeper during the
migration and do not lose committed WAL.
"""
neon_env_builder.num_safekeepers = 4
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 3,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
sk_set = mconf["sk_set"]
assert len(sk_set) == 3
other_sk = [sk.id for sk in env.safekeepers if sk.id not in sk_set][0]
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
ep.start(safekeeper_generation=1, safekeepers=sk_set)
ep.safe_psql("CREATE TABLE t(a int)")
ep.safe_psql("INSERT INTO t VALUES (0)")
# Stop one sk, so we have a lagging WAL on it.
env.get_safekeeper(sk_set[0]).stop()
# Advance the WAL on the other sks.
ep.safe_psql("INSERT INTO t VALUES (1)")
# Stop other sks to make sure compute_ctl doesn't advance the last_log_term on them during shutdown.
for sk_id in sk_set[1:]:
env.get_safekeeper(sk_id).stop()
ep.stop(mode="immediate")
for sk_id in sk_set:
env.get_safekeeper(sk_id).start()
# Bump the term on the lagging sk to make sure we don't use it to choose the most advanced sk.
env.get_safekeeper(sk_set[0]).http_client().term_bump(
env.initial_tenant, env.initial_timeline, 100
)
def get_commit_lsn(sk_set: list[int]):
flush_lsns = []
last_log_terms = []
for sk_id in sk_set:
sk = env.get_safekeeper(sk_id)
status = sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline)
flush_lsns.append(status.flush_lsn)
last_log_terms.append(status.last_log_term)
# In this test we assume that all sks have the same last_log_term.
assert len(set(last_log_terms)) == 1
flush_lsns.sort(reverse=True)
commit_lsn = flush_lsns[len(sk_set) // 2]
log.info(f"sk_set: {sk_set}, flush_lsns: {flush_lsns}, commit_lsn: {commit_lsn}")
return commit_lsn
commit_lsn_before_migration = get_commit_lsn(sk_set)
# Make two migrations, so the lagging sk stays in the sk_set, but other sks are replaced.
new_sk_set1 = [sk_set[0], sk_set[1], other_sk] # remove sk_set[2], add other_sk
new_sk_set2 = [sk_set[0], other_sk, sk_set[2]] # remove sk_set[1], add sk_set[2] back
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, new_sk_set1
)
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, new_sk_set2
)
commit_lsn_after_migration = get_commit_lsn(new_sk_set2)
# We should not lose committed WAL.
# If we have choosen the lagging sk to pull the timeline from, this might fail.
assert commit_lsn_before_migration <= commit_lsn_after_migration
ep.start(safekeeper_generation=5, safekeepers=new_sk_set2)
assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)]
def test_abort_safekeeper_migration(neon_env_builder: NeonEnvBuilder):
"""
Test that safekeeper migration can be aborted.
1. Insert failpoints and ensure the abort successfully reverts the timeline state.
2. Check that endpoint is operational after the abort.
"""
neon_env_builder.num_safekeepers = 2
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
"timeline_safekeeper_count": 1,
}
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS)
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert len(mconf["sk_set"]) == 1
cur_sk = mconf["sk_set"][0]
cur_gen = 1
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
ep.start(safekeeper_generation=1, safekeepers=mconf["sk_set"])
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
ep.safe_psql("CREATE TABLE t(a int)")
ep.safe_psql("INSERT INTO t VALUES (1)")
another_sk = [sk.id for sk in env.safekeepers if sk.id != cur_sk][0]
failpoints = [
"sk-migration-after-step-3",
"sk-migration-after-step-4",
"sk-migration-after-step-5",
"sk-migration-after-step-7",
]
for fp in failpoints:
env.storage_controller.configure_failpoints((fp, "return(1)"))
with pytest.raises(StorageControllerApiException, match=f"failpoint {fp}"):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [another_sk]
)
cur_gen += 1
env.storage_controller.configure_failpoints((fp, "off"))
# We should have a joint mconf after the failure.
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == cur_gen
assert mconf["sk_set"] == [cur_sk]
assert mconf["new_sk_set"] == [another_sk]
env.storage_controller.abort_safekeeper_migration(env.initial_tenant, env.initial_timeline)
cur_gen += 1
# Abort should revert the timeline to the previous sk_set and increment the generation.
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == cur_gen
assert mconf["sk_set"] == [cur_sk]
assert mconf["new_sk_set"] is None
assert ep.safe_psql("SHOW neon.safekeepers")[0][0].startswith(f"g#{cur_gen}:")
ep.safe_psql(f"INSERT INTO t VALUES ({cur_gen})")
# After step-8 the final mconf is committed and the migration is not abortable anymore.
# So the abort should not abort anything.
env.storage_controller.configure_failpoints(("sk-migration-after-step-8", "return(1)"))
with pytest.raises(StorageControllerApiException, match="failpoint sk-migration-after-step-8"):
env.storage_controller.migrate_safekeepers(
env.initial_tenant, env.initial_timeline, [another_sk]
)
cur_gen += 2
env.storage_controller.configure_failpoints((fp, "off"))
env.storage_controller.abort_safekeeper_migration(env.initial_tenant, env.initial_timeline)
# The migration is fully committed, no abort should have been performed.
mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
assert mconf["generation"] == cur_gen
assert mconf["sk_set"] == [another_sk]
assert mconf["new_sk_set"] is None
ep.safe_psql(f"INSERT INTO t VALUES ({cur_gen})")
ep.clear_buffers()
assert ep.safe_psql("SELECT * FROM t") == [(i + 1,) for i in range(cur_gen) if i % 2 == 0]