From 4d3b28bd2eecaf38b8700281e17d027b7920fdc3 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Wed, 30 Jul 2025 23:34:30 +0200 Subject: [PATCH 1/7] [Hadron] Always run databricks auth hook. (#12683) --- vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/postgres-v17 | 2 +- vendor/revisions.json | 8 ++++---- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index c9f9fdd011..2155cb165d 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit c9f9fdd0113b52c0bd535afdb09d3a543aeee25f +Subproject commit 2155cb165d05f617eb2c8ad7e43367189b627703 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index aaaeff2550..2aaab3bb4a 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit aaaeff2550d5deba58847f112af9b98fa3a58b00 +Subproject commit 2aaab3bb4a13557aae05bb2ae0ef0a132d0c4f85 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 9b9cb4b3e3..a42351fcd4 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 9b9cb4b3e33347aea8f61e606bb6569979516de5 +Subproject commit a42351fcd41ea01edede1daed65f651e838988fc diff --git a/vendor/postgres-v17 b/vendor/postgres-v17 index fa1788475e..1e01fcea2a 160000 --- a/vendor/postgres-v17 +++ b/vendor/postgres-v17 @@ -1 +1 @@ -Subproject commit fa1788475e3146cc9c7c6a1b74f48fd296898fcd +Subproject commit 1e01fcea2a6b38180021aa83e0051d95286d9096 diff --git a/vendor/revisions.json b/vendor/revisions.json index 7212c9f7c7..c02c748a72 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,18 +1,18 @@ { "v17": [ "17.5", - "fa1788475e3146cc9c7c6a1b74f48fd296898fcd" + "1e01fcea2a6b38180021aa83e0051d95286d9096" ], "v16": [ "16.9", - "9b9cb4b3e33347aea8f61e606bb6569979516de5" + "a42351fcd41ea01edede1daed65f651e838988fc" ], "v15": [ "15.13", - "aaaeff2550d5deba58847f112af9b98fa3a58b00" + "2aaab3bb4a13557aae05bb2ae0ef0a132d0c4f85" ], "v14": [ "14.18", - "c9f9fdd0113b52c0bd535afdb09d3a543aeee25f" + "2155cb165d05f617eb2c8ad7e43367189b627703" ] } From 01c39f378e0a81037ead9108ea83feb146310de8 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Wed, 30 Jul 2025 23:05:51 +0100 Subject: [PATCH 2/7] prewarm cancellation (#12785) Add DELETE /lfc/prewarm route which handles ongoing prewarm cancellation, update API spec, add prewarm Cancelled state Add offload Cancelled state when LFC is not initialized --- compute_tools/src/compute.rs | 3 + compute_tools/src/compute_prewarm.rs | 141 ++++++++++++++--------- compute_tools/src/http/openapi_spec.yaml | 11 +- compute_tools/src/http/routes/lfc.rs | 5 + compute_tools/src/http/server.rs | 7 +- libs/compute_api/src/responses.rs | 6 + test_runner/fixtures/endpoint/http.py | 27 +++-- test_runner/regress/test_lfc_prewarm.py | 94 ++++++++++++--- 8 files changed, 208 insertions(+), 86 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index f553bf3c1e..5b1cdb9805 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -32,6 +32,7 @@ use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::time::{Duration, Instant}; use std::{env, fs}; use tokio::{spawn, sync::watch, task::JoinHandle, time}; +use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, instrument, warn}; use url::Url; use utils::id::{TenantId, TimelineId}; @@ -192,6 +193,7 @@ pub struct ComputeState { pub startup_span: Option, pub lfc_prewarm_state: LfcPrewarmState, + pub lfc_prewarm_token: CancellationToken, pub lfc_offload_state: LfcOffloadState, /// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if @@ -217,6 +219,7 @@ impl ComputeState { lfc_offload_state: LfcOffloadState::default(), terminate_flush_lsn: None, promote_state: None, + lfc_prewarm_token: CancellationToken::new(), } } diff --git a/compute_tools/src/compute_prewarm.rs b/compute_tools/src/compute_prewarm.rs index 97e62c1c80..82cb28f1ac 100644 --- a/compute_tools/src/compute_prewarm.rs +++ b/compute_tools/src/compute_prewarm.rs @@ -7,7 +7,8 @@ use http::StatusCode; use reqwest::Client; use std::mem::replace; use std::sync::Arc; -use tokio::{io::AsyncReadExt, spawn}; +use tokio::{io::AsyncReadExt, select, spawn}; +use tokio_util::sync::CancellationToken; use tracing::{error, info}; #[derive(serde::Serialize, Default)] @@ -92,34 +93,35 @@ impl ComputeNode { /// If there is a prewarm request ongoing, return `false`, `true` otherwise. /// Has a failpoint "compute-prewarm" pub fn prewarm_lfc(self: &Arc, from_endpoint: Option) -> bool { + let token: CancellationToken; { - let state = &mut self.state.lock().unwrap().lfc_prewarm_state; - if let LfcPrewarmState::Prewarming = replace(state, LfcPrewarmState::Prewarming) { + let state = &mut self.state.lock().unwrap(); + token = state.lfc_prewarm_token.clone(); + if let LfcPrewarmState::Prewarming = + replace(&mut state.lfc_prewarm_state, LfcPrewarmState::Prewarming) + { return false; } } crate::metrics::LFC_PREWARMS.inc(); - let cloned = self.clone(); + let this = self.clone(); spawn(async move { - let state = match cloned.prewarm_impl(from_endpoint).await { - Ok(true) => LfcPrewarmState::Completed, - Ok(false) => { - info!( - "skipping LFC prewarm because LFC state is not found in endpoint storage" - ); - LfcPrewarmState::Skipped - } + let prewarm_state = match this.prewarm_impl(from_endpoint, token).await { + Ok(state) => state, Err(err) => { crate::metrics::LFC_PREWARM_ERRORS.inc(); error!(%err, "could not prewarm LFC"); - LfcPrewarmState::Failed { - error: format!("{err:#}"), - } + let error = format!("{err:#}"); + LfcPrewarmState::Failed { error } } }; - cloned.state.lock().unwrap().lfc_prewarm_state = state; + let state = &mut this.state.lock().unwrap(); + if let LfcPrewarmState::Cancelled = prewarm_state { + state.lfc_prewarm_token = CancellationToken::new(); + } + state.lfc_prewarm_state = prewarm_state; }); true } @@ -132,47 +134,70 @@ impl ComputeNode { /// Request LFC state from endpoint storage and load corresponding pages into Postgres. /// Returns a result with `false` if the LFC state is not found in endpoint storage. - async fn prewarm_impl(&self, from_endpoint: Option) -> Result { - let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?; + async fn prewarm_impl( + &self, + from_endpoint: Option, + token: CancellationToken, + ) -> Result { + let EndpointStoragePair { + url, + token: storage_token, + } = self.endpoint_storage_pair(from_endpoint)?; #[cfg(feature = "testing")] - fail::fail_point!("compute-prewarm", |_| { - bail!("prewarm configured to fail because of a failpoint") - }); + fail::fail_point!("compute-prewarm", |_| bail!("compute-prewarm failpoint")); info!(%url, "requesting LFC state from endpoint storage"); - let request = Client::new().get(&url).bearer_auth(token); - let res = request.send().await.context("querying endpoint storage")?; - match res.status() { + let request = Client::new().get(&url).bearer_auth(storage_token); + let response = select! { + _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled), + response = request.send() => response + } + .context("querying endpoint storage")?; + + match response.status() { StatusCode::OK => (), - StatusCode::NOT_FOUND => { - return Ok(false); - } + StatusCode::NOT_FOUND => return Ok(LfcPrewarmState::Skipped), status => bail!("{status} querying endpoint storage"), } let mut uncompressed = Vec::new(); - let lfc_state = res - .bytes() - .await - .context("getting request body from endpoint storage")?; - ZstdDecoder::new(lfc_state.iter().as_slice()) - .read_to_end(&mut uncompressed) - .await - .context("decoding LFC state")?; + let lfc_state = select! { + _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled), + lfc_state = response.bytes() => lfc_state + } + .context("getting request body from endpoint storage")?; + + let mut decoder = ZstdDecoder::new(lfc_state.iter().as_slice()); + select! { + _ = token.cancelled() => return Ok(LfcPrewarmState::Cancelled), + read = decoder.read_to_end(&mut uncompressed) => read + } + .context("decoding LFC state")?; + let uncompressed_len = uncompressed.len(); + info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}"); - info!(%url, "downloaded LFC state, uncompressed size {uncompressed_len}, loading into Postgres"); - - ComputeNode::get_maintenance_client(&self.tokio_conn_conf) + // Client connection and prewarm info querying are fast and therefore don't need + // cancellation + let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf) .await - .context("connecting to postgres")? - .query_one("select neon.prewarm_local_cache($1)", &[&uncompressed]) - .await - .context("loading LFC state into postgres") - .map(|_| ())?; + .context("connecting to postgres")?; + let pg_token = client.cancel_token(); - Ok(true) + let params: Vec<&(dyn postgres_types::ToSql + Sync)> = vec![&uncompressed]; + select! { + res = client.query_one("select neon.prewarm_local_cache($1)", ¶ms) => res, + _ = token.cancelled() => { + pg_token.cancel_query(postgres::NoTls).await + .context("cancelling neon.prewarm_local_cache()")?; + return Ok(LfcPrewarmState::Cancelled) + } + } + .context("loading LFC state into postgres") + .map(|_| ())?; + + Ok(LfcPrewarmState::Completed) } /// If offload request is ongoing, return false, true otherwise @@ -200,20 +225,20 @@ impl ComputeNode { async fn offload_lfc_with_state_update(&self) { crate::metrics::LFC_OFFLOADS.inc(); - - let Err(err) = self.offload_lfc_impl().await else { - self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed; - return; + let state = match self.offload_lfc_impl().await { + Ok(state) => state, + Err(err) => { + crate::metrics::LFC_OFFLOAD_ERRORS.inc(); + error!(%err, "could not offload LFC"); + let error = format!("{err:#}"); + LfcOffloadState::Failed { error } + } }; - crate::metrics::LFC_OFFLOAD_ERRORS.inc(); - error!(%err, "could not offload LFC state to endpoint storage"); - self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed { - error: format!("{err:#}"), - }; + self.state.lock().unwrap().lfc_offload_state = state; } - async fn offload_lfc_impl(&self) -> Result<()> { + async fn offload_lfc_impl(&self) -> Result { let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?; info!(%url, "requesting LFC state from Postgres"); @@ -228,7 +253,7 @@ impl ComputeNode { .context("deserializing LFC state")?; let Some(state) = state else { info!(%url, "empty LFC state, not exporting"); - return Ok(()); + return Ok(LfcOffloadState::Skipped); }; let mut compressed = Vec::new(); @@ -242,7 +267,7 @@ impl ComputeNode { let request = Client::new().put(url).bearer_auth(token).body(compressed); match request.send().await { - Ok(res) if res.status() == StatusCode::OK => Ok(()), + Ok(res) if res.status() == StatusCode::OK => Ok(LfcOffloadState::Completed), Ok(res) => bail!( "Request to endpoint storage failed with status: {}", res.status() @@ -250,4 +275,8 @@ impl ComputeNode { Err(err) => Err(err).context("writing to endpoint storage"), } } + + pub fn cancel_prewarm(self: &Arc) { + self.state.lock().unwrap().lfc_prewarm_token.cancel(); + } } diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index ab729d62b5..27e610a87d 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -139,6 +139,15 @@ paths: application/json: schema: $ref: "#/components/schemas/LfcPrewarmState" + delete: + tags: + - Prewarm + summary: Cancel ongoing LFC prewarm + description: "" + operationId: cancelLfcPrewarm + responses: + 202: + description: Prewarm cancelled /lfc/offload: post: @@ -636,7 +645,7 @@ components: properties: status: description: LFC offload status - enum: [not_offloaded, offloading, completed, failed] + enum: [not_offloaded, offloading, completed, skipped, failed] type: string error: description: LFC offload error, if any diff --git a/compute_tools/src/http/routes/lfc.rs b/compute_tools/src/http/routes/lfc.rs index e98bd781a2..7483198723 100644 --- a/compute_tools/src/http/routes/lfc.rs +++ b/compute_tools/src/http/routes/lfc.rs @@ -46,3 +46,8 @@ pub(in crate::http) async fn offload(compute: Compute) -> Response { ) } } + +pub(in crate::http) async fn cancel_prewarm(compute: Compute) -> StatusCode { + compute.cancel_prewarm(); + StatusCode::ACCEPTED +} diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs index 2fd3121f4f..869fdef11d 100644 --- a/compute_tools/src/http/server.rs +++ b/compute_tools/src/http/server.rs @@ -99,7 +99,12 @@ impl From<&Server> for Router> { ); let authenticated_router = Router::>::new() - .route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm)) + .route( + "/lfc/prewarm", + get(lfc::prewarm_state) + .post(lfc::prewarm) + .delete(lfc::cancel_prewarm), + ) .route("/lfc/offload", get(lfc::offload_state).post(lfc::offload)) .route("/promote", post(promote::promote)) .route("/check_writability", post(check_writability::is_writable)) diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index a27301e45e..a918644e4c 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -68,11 +68,15 @@ pub enum LfcPrewarmState { /// We tried to fetch the corresponding LFC state from the endpoint storage, /// but received `Not Found 404`. This should normally happen only during the /// first endpoint start after creation with `autoprewarm: true`. + /// This may also happen if LFC is turned off or not initialized /// /// During the orchestrated prewarm via API, when a caller explicitly /// provides the LFC state key to prewarm from, it's the caller responsibility /// to handle this status as an error state in this case. Skipped, + /// LFC prewarm was cancelled. Some pages in LFC cache may be prewarmed if query + /// has started working before cancellation + Cancelled, } impl Display for LfcPrewarmState { @@ -83,6 +87,7 @@ impl Display for LfcPrewarmState { LfcPrewarmState::Completed => f.write_str("Completed"), LfcPrewarmState::Skipped => f.write_str("Skipped"), LfcPrewarmState::Failed { error } => write!(f, "Error({error})"), + LfcPrewarmState::Cancelled => f.write_str("Cancelled"), } } } @@ -97,6 +102,7 @@ pub enum LfcOffloadState { Failed { error: String, }, + Skipped, } #[derive(Serialize, Debug, Clone, PartialEq)] diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index d235ac2143..c77a372017 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -78,20 +78,26 @@ class EndpointHttpClient(requests.Session): json: dict[str, str] = res.json() return json - def prewarm_lfc(self, from_endpoint_id: str | None = None): + def prewarm_lfc(self, from_endpoint_id: str | None = None) -> dict[str, str]: """ Prewarm LFC cache from given endpoint and wait till it finishes or errors """ params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict() self.post(self.prewarm_url, params=params).raise_for_status() - self.prewarm_lfc_wait() + return self.prewarm_lfc_wait() - def prewarm_lfc_wait(self): + def cancel_prewarm_lfc(self): + """ + Cancel LFC prewarm if any is ongoing + """ + self.delete(self.prewarm_url).raise_for_status() + + def prewarm_lfc_wait(self) -> dict[str, str]: """ Wait till LFC prewarm returns with error or success. If prewarm was not requested before calling this function, it will error """ - statuses = "failed", "completed", "skipped" + statuses = "failed", "completed", "skipped", "cancelled" def prewarmed(): json = self.prewarm_lfc_status() @@ -101,6 +107,7 @@ class EndpointHttpClient(requests.Session): wait_until(prewarmed, timeout=60) res = self.prewarm_lfc_status() assert res["status"] != "failed", res + return res def offload_lfc_status(self) -> dict[str, str]: res = self.get(self.offload_url) @@ -108,29 +115,31 @@ class EndpointHttpClient(requests.Session): json: dict[str, str] = res.json() return json - def offload_lfc(self): + def offload_lfc(self) -> dict[str, str]: """ Offload LFC cache to endpoint storage and wait till offload finishes or errors """ self.post(self.offload_url).raise_for_status() - self.offload_lfc_wait() + return self.offload_lfc_wait() - def offload_lfc_wait(self): + def offload_lfc_wait(self) -> dict[str, str]: """ Wait till LFC offload returns with error or success. If offload was not requested before calling this function, it will error """ + statuses = "failed", "completed", "skipped" def offloaded(): json = self.offload_lfc_status() status, err = json["status"], json.get("error") - assert status in ["failed", "completed"], f"{status}, {err=}" + assert status in statuses, f"{status}, {err=}" wait_until(offloaded, timeout=60) res = self.offload_lfc_status() assert res["status"] != "failed", res + return res - def promote(self, promote_spec: dict[str, Any], disconnect: bool = False): + def promote(self, promote_spec: dict[str, Any], disconnect: bool = False) -> dict[str, str]: url = f"http://localhost:{self.external_port}/promote" if disconnect: try: # send first request to start promote and disconnect diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index 2bbe8c3e97..a96f18177c 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -1,6 +1,6 @@ import random -import threading from enum import StrEnum +from threading import Thread from time import sleep from typing import Any @@ -47,19 +47,23 @@ def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) # With autoprewarm, we need to be sure LFC was offloaded after all writes # finish, so we sleep. Otherwise we'll have less prewarmed pages than we want sleep(AUTOOFFLOAD_INTERVAL_SECS) - client.offload_lfc_wait() - return + offload_res = client.offload_lfc_wait() + log.info(offload_res) + return offload_res if method == PrewarmMethod.COMPUTE_CTL: status = client.prewarm_lfc_status() assert status["status"] == "not_prewarmed" assert "error" not in status - client.offload_lfc() + offload_res = client.offload_lfc() + log.info(offload_res) assert client.prewarm_lfc_status()["status"] == "not_prewarmed" + parsed = prom_parse(client) desired = {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0, OFFLOAD_ERR_LABEL: 0, PREWARM_ERR_LABEL: 0} assert parsed == desired, f"{parsed=} != {desired=}" - return + + return offload_res raise AssertionError(f"{method} not in PrewarmMethod") @@ -68,21 +72,30 @@ def prewarm_endpoint( method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor, lfc_state: str | None ): if method == PrewarmMethod.AUTOPREWARM: - client.prewarm_lfc_wait() + prewarm_res = client.prewarm_lfc_wait() + log.info(prewarm_res) elif method == PrewarmMethod.COMPUTE_CTL: - client.prewarm_lfc() + prewarm_res = client.prewarm_lfc() + log.info(prewarm_res) + return prewarm_res elif method == PrewarmMethod.POSTGRES: cur.execute("select neon.prewarm_local_cache(%s)", (lfc_state,)) -def check_prewarmed( +def check_prewarmed_contains( method: PrewarmMethod, client: EndpointHttpClient, desired_status: dict[str, str | int] ): if method == PrewarmMethod.AUTOPREWARM: - assert client.prewarm_lfc_status() == desired_status + prewarm_status = client.prewarm_lfc_status() + for k in desired_status: + assert desired_status[k] == prewarm_status[k] + assert prom_parse(client)[PREWARM_LABEL] == 1 elif method == PrewarmMethod.COMPUTE_CTL: - assert client.prewarm_lfc_status() == desired_status + prewarm_status = client.prewarm_lfc_status() + for k in desired_status: + assert desired_status[k] == prewarm_status[k] + desired = {OFFLOAD_LABEL: 0, PREWARM_LABEL: 1, PREWARM_ERR_LABEL: 0, OFFLOAD_ERR_LABEL: 0} assert prom_parse(client) == desired @@ -149,9 +162,6 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): log.info(f"Used LFC size: {lfc_used_pages}") pg_cur.execute("select * from neon.get_prewarm_info()") total, prewarmed, skipped, _ = pg_cur.fetchall()[0] - log.info(f"Prewarm info: {total=} {prewarmed=} {skipped=}") - progress = (prewarmed + skipped) * 100 // total - log.info(f"Prewarm progress: {progress}%") assert lfc_used_pages > 10000 assert total > 0 assert prewarmed > 0 @@ -161,7 +171,54 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped} - check_prewarmed(method, client, desired) + check_prewarmed_contains(method, client, desired) + + +@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") +def test_lfc_prewarm_cancel(neon_simple_env: NeonEnv): + """ + Test we can cancel LFC prewarm and prewarm successfully after + """ + env = neon_simple_env + n_records = 1000000 + cfg = [ + "autovacuum = off", + "shared_buffers=1MB", + "neon.max_file_cache_size=1GB", + "neon.file_cache_size_limit=1GB", + "neon.file_cache_prewarm_limit=1000", + ] + endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg) + + pg_conn = endpoint.connect() + pg_cur = pg_conn.cursor() + pg_cur.execute("create schema neon; create extension neon with schema neon") + pg_cur.execute("create database lfc") + + lfc_conn = endpoint.connect(dbname="lfc") + lfc_cur = lfc_conn.cursor() + log.info(f"Inserting {n_records} rows") + lfc_cur.execute("create table t(pk integer primary key, payload text default repeat('?', 128))") + lfc_cur.execute(f"insert into t (pk) values (generate_series(1,{n_records}))") + log.info(f"Inserted {n_records} rows") + + client = endpoint.http_client() + method = PrewarmMethod.COMPUTE_CTL + offload_lfc(method, client, pg_cur) + + endpoint.stop() + endpoint.start() + + thread = Thread(target=lambda: prewarm_endpoint(method, client, pg_cur, None)) + thread.start() + # wait 2 seconds to ensure we cancel prewarm SQL query + sleep(2) + client.cancel_prewarm_lfc() + thread.join() + assert client.prewarm_lfc_status()["status"] == "cancelled" + + prewarm_endpoint(method, client, pg_cur, None) + assert client.prewarm_lfc_status()["status"] == "completed" @pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping") @@ -178,9 +235,8 @@ def test_lfc_prewarm_empty(neon_simple_env: NeonEnv): cur = conn.cursor() cur.execute("create schema neon; create extension neon with schema neon") method = PrewarmMethod.COMPUTE_CTL - offload_lfc(method, client, cur) - prewarm_endpoint(method, client, cur, None) - assert client.prewarm_lfc_status()["status"] == "skipped" + assert offload_lfc(method, client, cur)["status"] == "skipped" + assert prewarm_endpoint(method, client, cur, None)["status"] == "skipped" # autoprewarm isn't needed as we prewarm manually @@ -251,11 +307,11 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet workload_threads = [] for _ in range(n_threads): - t = threading.Thread(target=workload) + t = Thread(target=workload) workload_threads.append(t) t.start() - prewarm_thread = threading.Thread(target=prewarm) + prewarm_thread = Thread(target=prewarm) prewarm_thread.start() def prewarmed(): From 975b95f4cd9db3d25587f265f0587ba4fa123ce5 Mon Sep 17 00:00:00 2001 From: Aleksandr Sarantsev <99037063+ephemeralsad@users.noreply.github.com> Date: Thu, 31 Jul 2025 12:34:47 +0400 Subject: [PATCH 3/7] Introduce deletion API improvement RFC (#12484) ## Problem The deletion logic had become difficult to understand and maintain. ## Summary of changes - Added an RFC detailing proposed improvements to all deletion-related APIs. --------- Co-authored-by: Aleksandr Sarantsev --- ...025-07-07-node-deletion-api-improvement.md | 246 ++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100644 docs/rfcs/2025-07-07-node-deletion-api-improvement.md diff --git a/docs/rfcs/2025-07-07-node-deletion-api-improvement.md b/docs/rfcs/2025-07-07-node-deletion-api-improvement.md new file mode 100644 index 0000000000..47dadaee35 --- /dev/null +++ b/docs/rfcs/2025-07-07-node-deletion-api-improvement.md @@ -0,0 +1,246 @@ +# Node deletion API improvement + +Created on 2025-07-07 +Implemented on _TBD_ + +## Summary + +This RFC describes improvements to the storage controller API for gracefully deleting pageserver +nodes. + +## Motivation + +The basic node deletion API introduced in [#8226](https://github.com/neondatabase/neon/issues/8333) +has several limitations: + +- Deleted nodes can re-add themselves if they restart (e.g., a flaky node that keeps restarting and +we cannot reach via SSH to stop the pageserver). This issue has been resolved by tombstone +mechanism in [#12036](https://github.com/neondatabase/neon/issues/12036) +- Process of node deletion is not graceful, i.e. it just imitates a node failure + +In this context, "graceful" node deletion means that users do not experience any disruption or +negative effects, provided the system remains in a healthy state (i.e., the remaining pageservers +can handle the workload and all requirements are met). To achieve this, the system must perform +live migration of all tenant shards from the node being deleted while the node is still running +and continue processing all incoming requests. The node is removed only after all tenant shards +have been safely migrated. + +Although live migrations can be achieved with the drain functionality, it leads to incorrect shard +placement, such as not matching availability zones. This results in unnecessary work to optimize +the placement that was just recently performed. + +If we delete a node before its tenant shards are fully moved, the new node won't have all the +needed data (e.g. heatmaps) ready. This means user requests to the new node will be much slower at +first. If there are many tenant shards, this slowdown affects a huge amount of users. + +Graceful node deletion is more complicated and can introduce new issues. It takes longer because +live migration of each tenant shard can last several minutes. Using non-blocking accessors may +also cause deletion to wait if other processes are holding inner state lock. It also gets trickier +because we need to handle other requests, like drain and fill, at the same time. + +## Impacted components (e.g. pageserver, safekeeper, console, etc) + +- storage controller +- pageserver (indirectly) + +## Proposed implementation + +### Tombstones + +To resolve the problem of deleted nodes re-adding themselves, a tombstone mechanism was introduced +as part of the node stored information. Each node has a separate `NodeLifecycle` field with two +possible states: `Active` and `Deleted`. When node deletion completes, the database row is not +deleted but instead has its `NodeLifecycle` column switched to `Deleted`. Nodes with `Deleted` +lifecycle are treated as if the row is absent for most handlers, with several exceptions: reattach +and register functionality must be aware of tombstones. Additionally, new debug handlers are +available for listing and deleting tombstones via the `/debug/v1/tombstone` path. + +### Gracefulness + +The problem of making node deletion graceful is complex and involves several challenges: + +- **Cancellable**: The operation must be cancellable to allow administrators to abort the process +if needed, e.g. if run by mistake. +- **Non-blocking**: We don't want to block deployment operations like draining/filling on the node +deletion process. We need clear policies for handling concurrent operations: what happens when a +drain/fill request arrives while deletion is in progress, and what happens when a delete request +arrives while drain/fill is in progress. +- **Persistent**: If the storage controller restarts during this long-running operation, we must +preserve progress and automatically resume the deletion process after the storage controller +restarts. +- **Migrated correctly**: We cannot simply use the existing drain mechanism for nodes scheduled +for deletion, as this would move shards to irrelevant locations. The drain process expects the +node to return, so it only moves shards to backup locations, not to their preferred AZs. It also +leaves secondary locations unmoved. This could result in unnecessary load on the storage +controller and inefficient resource utilization. +- **Force option**: Administrators need the ability to force immediate, non-graceful deletion when +time constraints or emergency situations require it, bypassing the normal graceful migration +process. + +See below for a detailed breakdown of the proposed changes and mechanisms. + +#### Node lifecycle + +New `NodeLifecycle` enum and a matching database field with these values: +- `Active`: The normal state. All operations are allowed. +- `ScheduledForDeletion`: The node is marked to be deleted soon. Deletion may be in progress or +will happen later, but the node will eventually be removed. All operations are allowed. +- `Deleted`: The node is fully deleted. No operations are allowed, and the node cannot be brought +back. The only action left is to remove its record from the database. Any attempt to register a +node in this state will fail. + +This state persists across storage controller restarts. + +**State transition** +``` + +--------------------+ + +---| Active |<---------------------+ + | +--------------------+ | + | ^ | + | start_node_delete | cancel_node_delete | + v | | + +----------------------------------+ | + | ScheduledForDeletion | | + +----------------------------------+ | + | | + | node_register | + | | + | delete_node (at the finish) | + | | + v | + +---------+ tombstone_delete +----------+ + | Deleted |-------------------------------->| no row | + +---------+ +----------+ +``` + +#### NodeSchedulingPolicy::Deleting + +A `Deleting` variant to the `NodeSchedulingPolicy` enum. This means the deletion function is +running for the node right now. Only one node can have the `Deleting` policy at a time. + +The `NodeSchedulingPolicy::Deleting` state is persisted in the database. However, after a storage +controller restart, any node previously marked as `Deleting` will have its scheduling policy reset +to `Pause`. The policy will only transition back to `Deleting` when the deletion operation is +actively started again, as triggered by the node's `NodeLifecycle::ScheduledForDeletion` state. + +`NodeSchedulingPolicy` transition details: +1. When `node_delete` begins, set the policy to `NodeSchedulingPolicy::Deleting`. +2. If `node_delete` is cancelled (for example, due to a concurrent drain operation), revert the +policy to its previous value. The policy is persisted in storcon DB. +3. After `node_delete` completes, the final value of the scheduling policy is irrelevant, since +`NodeLifecycle::Deleted` prevents any further access to this field. + +The deletion process cannot be initiated for nodes currently undergoing deployment-related +operations (`Draining`, `Filling`, or `PauseForRestart` policies). Deletion will only be triggered +once the node transitions to either the `Active` or `Pause` state. + +#### OperationTracker + +A replacement for `Option ongoing_operation`, the `OperationTracker` is a +dedicated service state object responsible for managing all long-running node operations (drain, +fill, delete) with robust concurrency control. + +Key responsibilities: +- Orchestrates the execution of operations +- Supports cancellation of currently running operations +- Enforces operation constraints, e.g. allowing only single drain/fill operation at a time +- Persists deletion state, enabling recovery of pending deletions across restarts +- Ensures thread safety across concurrent requests + +#### Attached tenant shard processing + +When deleting a node, handle each attached tenant shard as follows: + +1. Pick the best node to become the new attached (the candidate). +2. If the candidate already has this shard as a secondary: + - Create a new secondary for the shard on another suitable node. + Otherwise: + - Create a secondary for the shard on the candidate node. +3. Wait until all secondaries are ready and pre-warmed. +4. Promote the candidate's secondary to attached. +5. Remove the secondary from the node being deleted. + +This process safely moves all attached shards before deleting the node. + +#### Secondary tenant shard processing + +When deleting a node, handle each secondary tenant shard as follows: + +1. Choose the best node to become the new secondary. +2. Create a secondary for the shard on that node. +3. Wait until the new secondary is ready. +4. Remove the secondary from the node being deleted. + +This ensures all secondary shards are safely moved before deleting the node. + +### Reliability, failure modes and corner cases + +In case of a storage controller failure and following restart, the system behavior depends on the +`NodeLifecycle` state: + +- If `NodeLifecycle` is `Active`: No action is taken for this node. +- If `NodeLifecycle` is `Deleted`: The node will not be re-added. +- If `NodeLifecycle` is `ScheduledForDeletion`: A deletion background task will be launched for +this node. + +In case of a pageserver node failure during deletion, the behavior depends on the `force` flag: +- If `force` is set: The node deletion will proceed regardless of the node's availability. +- If `force` is not set: The deletion will be retried a limited number of times. If the node +remains unavailable, the deletion process will pause and automatically resume when the node +becomes healthy again. + +### Operations concurrency + +The following sections describe the behavior when different types of requests arrive at the storage +controller and how they interact with ongoing operations. + +#### Delete request + +Handler: `PUT /control/v1/node/:node_id/delete` + +1. If node lifecycle is `NodeLifecycle::ScheduledForDeletion`: + - Return `200 OK`: there is already an ongoing deletion request for this node +2. Update & persist lifecycle to `NodeLifecycle::ScheduledForDeletion` +3. Persist current scheduling policy +4. If there is no active operation (drain/fill/delete): + - Run deletion process for this node + +#### Cancel delete request + +Handler: `DELETE /control/v1/node/:node_id/delete` + +1. If node lifecycle is not `NodeLifecycle::ScheduledForDeletion`: + - Return `404 Not Found`: there is no current deletion request for this node +2. If the active operation is deleting this node, cancel it +3. Update & persist lifecycle to `NodeLifecycle::Active` +4. Restore the last scheduling policy from persistence + +#### Drain/fill request + +1. If there are already ongoing drain/fill processes: + - Return `409 Conflict`: queueing of drain/fill processes is not supported +2. If there is an ongoing delete process: + - Cancel it and wait until it is cancelled +3. Run the drain/fill process +4. After the drain/fill process is cancelled or finished: + - Try to find another candidate to delete and run the deletion process for that node + +#### Drain/fill cancel request + +1. If the active operation is not the related process: + - Return `400 Bad Request`: cancellation request is incorrect, operations are not the same +2. Cancel the active operation +3. Try to find another candidate to delete and run the deletion process for that node + +## Definition of Done + +- [x] Fix flaky node scenario and introduce related debug handlers +- [ ] Node deletion intent is persistent - a node will be eventually deleted after a deletion +request regardless of draining/filling requests and restarts +- [ ] Node deletion can be graceful - deletion completes only after moving all tenant shards to +recommended locations +- [ ] Deploying does not break due to long deletions - drain/fill operations override deletion +process and deletion resumes after drain/fill completes +- [ ] `force` flag is implemented and provides fast, failure-tolerant node removal (e.g., when a +pageserver node does not respond) +- [ ] Legacy delete handler code is removed from storage_controller, test_runner, and storcon_cli From edd60730c89433dd8b6990bdaf9fdad34213c411 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Thu, 31 Jul 2025 13:29:25 +0400 Subject: [PATCH 4/7] safekeeper: use last_log_term in mconf switch + choose most advanced sk in pull timeline (#12778) ## Problem I discovered two bugs corresponding to safekeeper migration, which together might lead to a data loss during the migration. The second bug is from a hadron patch and might lead to a data loss during the safekeeper restore in hadron as well. 1. `switch_membership` returns the current `term` instead of `last_log_term`. It is used to choose the `sync_position` in the algorithm, so we might choose the wrong one and break the correctness guarantees. 2. The current `term` is used to choose the most advanced SK in `pull_timeline` with higher priority than `flush_lsn`. It is incorrect because the most advanced safekeeper is the one with the highest `(last_log_term, flush_lsn)` pair. The compute might bump term on the least advanced sk, making it the best choice to pull from, and thus making committed log entries "uncommitted" after `pull_timeline` Part of https://databricks.atlassian.net/browse/LKB-1017 ## Summary of changes - Return `last_log_term` in `switch_membership` - Use `(last_log_term, flush_lsn)` as a primary key for choosing the most advanced sk in `pull_timeline` and deny pulling if the `max_term` is higher than on the most advanced sk (hadron only) - Write tests for both cases - Retry `sync_safekeepers` in `compute_ctl` - Take into the account the quorum size when calculating `sync_position` --- compute_tools/src/compute.rs | 40 +++- safekeeper/src/pull_timeline.rs | 24 ++- safekeeper/src/timeline.rs | 6 +- .../src/service/safekeeper_service.rs | 42 ++++- test_runner/fixtures/neon_fixtures.py | 1 + .../regress/test_safekeeper_migration.py | 174 ++++++++++++++++++ 6 files changed, 275 insertions(+), 12 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 5b1cdb9805..1df837e1e6 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -35,6 +35,9 @@ use tokio::{spawn, sync::watch, task::JoinHandle, time}; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, instrument, warn}; use url::Url; +use utils::backoff::{ + DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff_duration, +}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use utils::measured_stream::MeasuredReader; @@ -1557,6 +1560,41 @@ impl ComputeNode { Ok(lsn) } + fn sync_safekeepers_with_retries(&self, storage_auth_token: Option) -> Result { + let max_retries = 5; + let mut attempts = 0; + loop { + let result = self.sync_safekeepers(storage_auth_token.clone()); + match &result { + Ok(_) => { + if attempts > 0 { + tracing::info!("sync_safekeepers succeeded after {attempts} retries"); + } + return result; + } + Err(e) if attempts < max_retries => { + tracing::info!( + "sync_safekeepers failed, will retry (attempt {attempts}): {e:#}" + ); + } + Err(err) => { + tracing::warn!( + "sync_safekeepers still failed after {attempts} retries, giving up: {err:?}" + ); + return result; + } + } + // sleep and retry + let backoff = exponential_backoff_duration( + attempts, + DEFAULT_BASE_BACKOFF_SECONDS, + DEFAULT_MAX_BACKOFF_SECONDS, + ); + std::thread::sleep(backoff); + attempts += 1; + } + } + /// Do all the preparations like PGDATA directory creation, configuration, /// safekeepers sync, basebackup, etc. #[instrument(skip_all)] @@ -1592,7 +1630,7 @@ impl ComputeNode { lsn } else { info!("starting safekeepers syncing"); - self.sync_safekeepers(pspec.storage_auth_token.clone()) + self.sync_safekeepers_with_retries(pspec.storage_auth_token.clone()) .with_context(|| "failed to sync safekeepers")? }; info!("safekeepers synced at LSN {}", lsn); diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 43232db950..4febc7656e 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -612,19 +612,25 @@ pub async fn handle_request( } } + let max_term = statuses + .iter() + .map(|(status, _)| status.acceptor_state.term) + .max() + .unwrap(); + // Find the most advanced safekeeper let (status, i) = statuses .into_iter() .max_by_key(|(status, _)| { ( status.acceptor_state.epoch, + status.flush_lsn, /* BEGIN_HADRON */ // We need to pull from the SK with the highest term. // This is because another compute may come online and vote the same highest term again on the other two SKs. // Then, there will be 2 computes running on the same term. status.acceptor_state.term, /* END_HADRON */ - status.flush_lsn, status.commit_lsn, ) }) @@ -634,6 +640,22 @@ pub async fn handle_request( assert!(status.tenant_id == request.tenant_id); assert!(status.timeline_id == request.timeline_id); + // TODO(diko): This is hadron only check to make sure that we pull the timeline + // from the safekeeper with the highest term during timeline restore. + // We could avoid returning the error by calling bump_term after pull_timeline. + // However, this is not a big deal because we retry the pull_timeline requests. + // The check should be removed together with removing custom hadron logic for + // safekeeper restore. + if wait_for_peer_timeline_status && status.acceptor_state.term != max_term { + return Err(ApiError::PreconditionFailed( + format!( + "choosen safekeeper {} has term {}, but the most advanced term is {}", + safekeeper_host, status.acceptor_state.term, max_term + ) + .into(), + )); + } + match pull_timeline( status, safekeeper_host, diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index e083a49428..25ac8e5bd3 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -195,12 +195,14 @@ impl StateSK { to: Configuration, ) -> Result { let result = self.state_mut().membership_switch(to).await?; + let flush_lsn = self.flush_lsn(); + let last_log_term = self.state().acceptor_state.get_last_log_term(flush_lsn); Ok(TimelineMembershipSwitchResponse { previous_conf: result.previous_conf, current_conf: result.current_conf, - last_log_term: self.state().acceptor_state.term, - flush_lsn: self.flush_lsn(), + last_log_term, + flush_lsn, }) } diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index a60ebb85c6..fab1342d5d 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -24,12 +24,12 @@ use pageserver_api::controller_api::{ }; use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo}; use safekeeper_api::PgVersionId; +use safekeeper_api::Term; use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration}; use safekeeper_api::models::{ PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest, TimelineMembershipSwitchResponse, }; -use safekeeper_api::{INITIAL_TERM, Term}; use safekeeper_client::mgmt_api; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -1298,13 +1298,7 @@ impl Service { ) .await?; - let mut sync_position = (INITIAL_TERM, Lsn::INVALID); - for res in results.into_iter().flatten() { - let sk_position = (res.last_log_term, res.flush_lsn); - if sync_position < sk_position { - sync_position = sk_position; - } - } + let sync_position = Self::get_sync_position(&results)?; tracing::info!( %generation, @@ -1598,4 +1592,36 @@ impl Service { Ok(()) } + + /// Get membership switch responses from all safekeepers and return the sync position. + /// + /// Sync position is a position equal or greater than the commit position. + /// It is guaranteed that all WAL entries with (last_log_term, flush_lsn) + /// greater than the sync position are not committed (= not on a quorum). + /// + /// Returns error if there is no quorum of successful responses. + fn get_sync_position( + responses: &[mgmt_api::Result], + ) -> Result<(Term, Lsn), ApiError> { + let quorum_size = responses.len() / 2 + 1; + + let mut wal_positions = responses + .iter() + .flatten() + .map(|res| (res.last_log_term, res.flush_lsn)) + .collect::>(); + + // Should be already checked if the responses are from tenant_timeline_set_membership_quorum. + if wal_positions.len() < quorum_size { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "not enough successful responses to get sync position: {}/{}", + wal_positions.len(), + quorum_size, + ))); + } + + wal_positions.sort(); + + Ok(wal_positions[quorum_size - 1]) + } } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 11d54f7831..c3dfc78218 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2313,6 +2313,7 @@ class NeonStorageController(MetricsGetter, LogUtils): timeline_id: TimelineId, new_sk_set: list[int], ): + log.info(f"migrate_safekeepers({tenant_id}, {timeline_id}, {new_sk_set})") response = self.request( "POST", f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate", diff --git a/test_runner/regress/test_safekeeper_migration.py b/test_runner/regress/test_safekeeper_migration.py index 2ceeea37a5..97a6ece446 100644 --- a/test_runner/regress/test_safekeeper_migration.py +++ b/test_runner/regress/test_safekeeper_migration.py @@ -286,3 +286,177 @@ def test_sk_generation_aware_tombstones(neon_env_builder: NeonEnvBuilder): assert re.match(r".*Timeline .* deleted.*", exc.value.response.text) # The timeline should remain deleted. expect_deleted(second_sk) + + +def test_safekeeper_migration_stale_timeline(neon_env_builder: NeonEnvBuilder): + """ + Test that safekeeper migration handles stale timeline correctly by migrating to + a safekeeper with a stale timeline. + 1. Check that we are waiting for the stale timeline to catch up with the commit lsn. + The migration might fail if there is no compute to advance the WAL. + 2. Check that we rely on last_log_term (and not the current term) when waiting for the + sync_position on step 7. + 3. Check that migration succeeds if the compute is running. + """ + neon_env_builder.num_safekeepers = 2 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + "timeline_safekeeper_count": 1, + } + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS) + env.storage_controller.allowed_errors.append(".*not enough successful .* to reach quorum.*") + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + + active_sk = env.get_safekeeper(mconf["sk_set"][0]) + other_sk = [sk for sk in env.safekeepers if sk.id != active_sk.id][0] + + ep = env.endpoints.create("main", tenant_id=env.initial_tenant) + ep.start(safekeeper_generation=1, safekeepers=[active_sk.id]) + ep.safe_psql("CREATE TABLE t(a int)") + ep.safe_psql("INSERT INTO t VALUES (0)") + + # Pull the timeline to other_sk, so other_sk now has a "stale" timeline on it. + other_sk.pull_timeline([active_sk], env.initial_tenant, env.initial_timeline) + + # Advance the WAL on active_sk. + ep.safe_psql("INSERT INTO t VALUES (1)") + + # The test is more tricky if we have the same last_log_term but different term/flush_lsn. + # Stop the active_sk during the endpoint shutdown because otherwise compute_ctl runs + # sync_safekeepers and advances last_log_term on active_sk. + active_sk.stop() + ep.stop(mode="immediate") + active_sk.start() + + active_sk_status = active_sk.http_client().timeline_status( + env.initial_tenant, env.initial_timeline + ) + other_sk_status = other_sk.http_client().timeline_status( + env.initial_tenant, env.initial_timeline + ) + + # other_sk should have the same last_log_term, but a stale flush_lsn. + assert active_sk_status.last_log_term == other_sk_status.last_log_term + assert active_sk_status.flush_lsn > other_sk_status.flush_lsn + + commit_lsn = active_sk_status.flush_lsn + + # Bump the term on other_sk to make it higher than active_sk. + # This is to make sure we don't use current term instead of last_log_term in the algorithm. + other_sk.http_client().term_bump( + env.initial_tenant, env.initial_timeline, active_sk_status.term + 100 + ) + + # TODO(diko): now it fails because the timeline on other_sk is stale and there is no compute + # to catch up it with active_sk. It might be fixed in https://databricks.atlassian.net/browse/LKB-946 + # if we delete stale timelines before starting the migration. + # But the rest of the test is still valid: we should not lose committed WAL after the migration. + with pytest.raises( + StorageControllerApiException, match="not enough successful .* to reach quorum" + ): + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, [other_sk.id] + ) + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert mconf["new_sk_set"] == [other_sk.id] + assert mconf["sk_set"] == [active_sk.id] + assert mconf["generation"] == 2 + + # Start the endpoint, so it advances the WAL on other_sk. + ep.start(safekeeper_generation=2, safekeepers=[active_sk.id, other_sk.id]) + # Now the migration should succeed. + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, [other_sk.id] + ) + + # Check that we didn't lose committed WAL. + assert ( + other_sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline).flush_lsn + >= commit_lsn + ) + assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)] + + +def test_pull_from_most_advanced_sk(neon_env_builder: NeonEnvBuilder): + """ + Test that we pull the timeline from the most advanced safekeeper during the + migration and do not lose committed WAL. + """ + neon_env_builder.num_safekeepers = 4 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + "timeline_safekeeper_count": 3, + } + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.extend(PAGESERVER_ALLOWED_ERRORS) + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + + sk_set = mconf["sk_set"] + assert len(sk_set) == 3 + + other_sk = [sk.id for sk in env.safekeepers if sk.id not in sk_set][0] + + ep = env.endpoints.create("main", tenant_id=env.initial_tenant) + ep.start(safekeeper_generation=1, safekeepers=sk_set) + ep.safe_psql("CREATE TABLE t(a int)") + ep.safe_psql("INSERT INTO t VALUES (0)") + + # Stop one sk, so we have a lagging WAL on it. + env.get_safekeeper(sk_set[0]).stop() + # Advance the WAL on the other sks. + ep.safe_psql("INSERT INTO t VALUES (1)") + + # Stop other sks to make sure compute_ctl doesn't advance the last_log_term on them during shutdown. + for sk_id in sk_set[1:]: + env.get_safekeeper(sk_id).stop() + ep.stop(mode="immediate") + for sk_id in sk_set: + env.get_safekeeper(sk_id).start() + + # Bump the term on the lagging sk to make sure we don't use it to choose the most advanced sk. + env.get_safekeeper(sk_set[0]).http_client().term_bump( + env.initial_tenant, env.initial_timeline, 100 + ) + + def get_commit_lsn(sk_set: list[int]): + flush_lsns = [] + last_log_terms = [] + for sk_id in sk_set: + sk = env.get_safekeeper(sk_id) + status = sk.http_client().timeline_status(env.initial_tenant, env.initial_timeline) + flush_lsns.append(status.flush_lsn) + last_log_terms.append(status.last_log_term) + + # In this test we assume that all sks have the same last_log_term. + assert len(set(last_log_terms)) == 1 + + flush_lsns.sort(reverse=True) + commit_lsn = flush_lsns[len(sk_set) // 2] + + log.info(f"sk_set: {sk_set}, flush_lsns: {flush_lsns}, commit_lsn: {commit_lsn}") + return commit_lsn + + commit_lsn_before_migration = get_commit_lsn(sk_set) + + # Make two migrations, so the lagging sk stays in the sk_set, but other sks are replaced. + new_sk_set1 = [sk_set[0], sk_set[1], other_sk] # remove sk_set[2], add other_sk + new_sk_set2 = [sk_set[0], other_sk, sk_set[2]] # remove sk_set[1], add sk_set[2] back + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, new_sk_set1 + ) + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, new_sk_set2 + ) + + commit_lsn_after_migration = get_commit_lsn(new_sk_set2) + + # We should not lose committed WAL. + # If we have choosen the lagging sk to pull the timeline from, this might fail. + assert commit_lsn_before_migration <= commit_lsn_after_migration + + ep.start(safekeeper_generation=5, safekeepers=new_sk_set2) + assert ep.safe_psql("SELECT * FROM t") == [(0,), (1,)] From f3ee6e818de91cde533199d56426dda4b1d16da5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20Szafra=C5=84ski?= Date: Thu, 31 Jul 2025 11:53:48 +0200 Subject: [PATCH 5/7] [proxy] Correctly classify ConnectErrors (#12793) As is, e.g. quota errors on wake compute are logged as "compute" errors. --- proxy/src/serverless/backend.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 511bdc4e42..5b356c8460 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -458,7 +458,7 @@ pub(crate) enum LocalProxyConnError { impl ReportableError for HttpConnError { fn get_error_kind(&self) -> ErrorKind { match self { - HttpConnError::ConnectError(_) => ErrorKind::Compute, + HttpConnError::ConnectError(e) => e.get_error_kind(), HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute, HttpConnError::PostgresConnectionError(p) => match p.as_db_error() { // user provided a wrong database name From 8fe75961206877811c337f448ee9f0076c68598d Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Thu, 31 Jul 2025 12:11:30 +0200 Subject: [PATCH 6/7] chore(compute_tools): Delete unused anon_ext_fn_reassign.sql (#12787) It's an anon v1 failed launch artifact, I suppose. --- compute_tools/src/sql/anon_ext_fn_reassign.sql | 13 ------------- 1 file changed, 13 deletions(-) delete mode 100644 compute_tools/src/sql/anon_ext_fn_reassign.sql diff --git a/compute_tools/src/sql/anon_ext_fn_reassign.sql b/compute_tools/src/sql/anon_ext_fn_reassign.sql deleted file mode 100644 index 714a6db1e9..0000000000 --- a/compute_tools/src/sql/anon_ext_fn_reassign.sql +++ /dev/null @@ -1,13 +0,0 @@ -DO $$ -DECLARE - query varchar; -BEGIN - FOR query IN - SELECT pg_catalog.format('ALTER FUNCTION %I(%s) OWNER TO {db_owner};', p.oid::regproc, pg_catalog.pg_get_function_identity_arguments(p.oid)) - FROM pg_catalog.pg_proc p - WHERE p.pronamespace OPERATOR(pg_catalog.=) 'anon'::regnamespace::oid - LOOP - EXECUTE query; - END LOOP; -END -$$; From f8fc0bf3c0552d7af5e257be11c38cd207e52929 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 31 Jul 2025 12:25:33 +0200 Subject: [PATCH 7/7] neon_local: use doc comments for help texts (#12270) Clap automatically uses doc comments as help/about texts. Doc comments are strictly better, since they're also used e.g. for IDE documentation, and are better formatted. This patch updates all `neon_local` commands to use doc comments (courtesy of GPT-o3). --- control_plane/src/bin/neon_local.rs | 520 ++++++++++++---------------- 1 file changed, 220 insertions(+), 300 deletions(-) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 95500b0b18..2b81c3957c 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -71,8 +71,9 @@ const DEFAULT_PG_VERSION_NUM: &str = "17"; const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/"; +/// Neon CLI. #[derive(clap::Parser)] -#[command(version = GIT_VERSION, about, name = "Neon CLI")] +#[command(version = GIT_VERSION, name = "Neon CLI")] struct Cli { #[command(subcommand)] command: NeonLocalCmd, @@ -107,30 +108,31 @@ enum NeonLocalCmd { Stop(StopCmdArgs), } +/// Initialize a new Neon repository, preparing configs for services to start with. #[derive(clap::Args)] -#[clap(about = "Initialize a new Neon repository, preparing configs for services to start with")] struct InitCmdArgs { - #[clap(long, help("How many pageservers to create (default 1)"))] + /// How many pageservers to create (default 1). + #[clap(long)] num_pageservers: Option, #[clap(long)] config: Option, - #[clap(long, help("Force initialization even if the repository is not empty"))] + /// Force initialization even if the repository is not empty. + #[clap(long, default_value = "must-not-exist")] #[arg(value_parser)] - #[clap(default_value = "must-not-exist")] force: InitForceMode, } +/// Start pageserver and safekeepers. #[derive(clap::Args)] -#[clap(about = "Start pageserver and safekeepers")] struct StartCmdArgs { #[clap(long = "start-timeout", default_value = "10s")] timeout: humantime::Duration, } +/// Stop pageserver and safekeepers. #[derive(clap::Args)] -#[clap(about = "Stop pageserver and safekeepers")] struct StopCmdArgs { #[arg(value_enum)] #[clap(long, default_value_t = StopMode::Fast)] @@ -143,8 +145,8 @@ enum StopMode { Immediate, } +/// Manage tenants. #[derive(clap::Subcommand)] -#[clap(about = "Manage tenants")] enum TenantCmd { List, Create(TenantCreateCmdArgs), @@ -155,38 +157,36 @@ enum TenantCmd { #[derive(clap::Args)] struct TenantCreateCmdArgs { - #[clap( - long = "tenant-id", - help = "Tenant id. Represented as a hexadecimal string 32 symbols length" - )] + /// Tenant ID, as a 32-byte hexadecimal string. + #[clap(long = "tenant-id")] tenant_id: Option, - #[clap( - long, - help = "Use a specific timeline id when creating a tenant and its initial timeline" - )] + /// Use a specific timeline id when creating a tenant and its initial timeline. + #[clap(long)] timeline_id: Option, #[clap(short = 'c')] config: Vec, + /// Postgres version to use for the initial timeline. #[arg(default_value = DEFAULT_PG_VERSION_NUM)] - #[clap(long, help = "Postgres version to use for the initial timeline")] + #[clap(long)] pg_version: PgMajorVersion, - #[clap( - long, - help = "Use this tenant in future CLI commands where tenant_id is needed, but not specified" - )] + /// Use this tenant in future CLI commands where tenant_id is needed, but not specified. + #[clap(long)] set_default: bool, - #[clap(long, help = "Number of shards in the new tenant")] + /// Number of shards in the new tenant. + #[clap(long)] #[arg(default_value_t = 0)] shard_count: u8, - #[clap(long, help = "Sharding stripe size in pages")] + /// Sharding stripe size in pages. + #[clap(long)] shard_stripe_size: Option, - #[clap(long, help = "Placement policy shards in this tenant")] + /// Placement policy shards in this tenant. + #[clap(long)] #[arg(value_parser = parse_placement_policy)] placement_policy: Option, } @@ -195,44 +195,35 @@ fn parse_placement_policy(s: &str) -> anyhow::Result { Ok(serde_json::from_str::(s)?) } +/// Set a particular tenant as default in future CLI commands where tenant_id is needed, but not +/// specified. #[derive(clap::Args)] -#[clap( - about = "Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified" -)] struct TenantSetDefaultCmdArgs { - #[clap( - long = "tenant-id", - help = "Tenant id. Represented as a hexadecimal string 32 symbols length" - )] + /// Tenant ID, as a 32-byte hexadecimal string. + #[clap(long = "tenant-id")] tenant_id: TenantId, } #[derive(clap::Args)] struct TenantConfigCmdArgs { - #[clap( - long = "tenant-id", - help = "Tenant id. Represented as a hexadecimal string 32 symbols length" - )] + /// Tenant ID, as a 32-byte hexadecimal string. + #[clap(long = "tenant-id")] tenant_id: Option, #[clap(short = 'c')] config: Vec, } +/// Import a tenant that is present in remote storage, and create branches for its timelines. #[derive(clap::Args)] -#[clap( - about = "Import a tenant that is present in remote storage, and create branches for its timelines" -)] struct TenantImportCmdArgs { - #[clap( - long = "tenant-id", - help = "Tenant id. Represented as a hexadecimal string 32 symbols length" - )] + /// Tenant ID, as a 32-byte hexadecimal string. + #[clap(long = "tenant-id")] tenant_id: TenantId, } +/// Manage timelines. #[derive(clap::Subcommand)] -#[clap(about = "Manage timelines")] enum TimelineCmd { List(TimelineListCmdArgs), Branch(TimelineBranchCmdArgs), @@ -240,98 +231,87 @@ enum TimelineCmd { Import(TimelineImportCmdArgs), } +/// List all timelines available to this pageserver. #[derive(clap::Args)] -#[clap(about = "List all timelines available to this pageserver")] struct TimelineListCmdArgs { - #[clap( - long = "tenant-id", - help = "Tenant id. Represented as a hexadecimal string 32 symbols length" - )] + /// Tenant ID, as a 32-byte hexadecimal string. + #[clap(long = "tenant-id")] tenant_shard_id: Option, } +/// Create a new timeline, branching off from another timeline. #[derive(clap::Args)] -#[clap(about = "Create a new timeline, branching off from another timeline")] struct TimelineBranchCmdArgs { - #[clap( - long = "tenant-id", - help = "Tenant id. Represented as a hexadecimal string 32 symbols length" - )] + /// Tenant ID, as a 32-byte hexadecimal string. + #[clap(long = "tenant-id")] tenant_id: Option, - - #[clap(long, help = "New timeline's ID")] + /// New timeline's ID, as a 32-byte hexadecimal string. + #[clap(long)] timeline_id: Option, - - #[clap(long, help = "Human-readable alias for the new timeline")] + /// Human-readable alias for the new timeline. + #[clap(long)] branch_name: String, - - #[clap( - long, - help = "Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name." - )] + /// Use last Lsn of another timeline (and its data) as base when creating the new timeline. The + /// timeline gets resolved by its branch name. + #[clap(long)] ancestor_branch_name: Option, - - #[clap( - long, - help = "When using another timeline as base, use a specific Lsn in it instead of the latest one" - )] + /// When using another timeline as base, use a specific Lsn in it instead of the latest one. + #[clap(long)] ancestor_start_lsn: Option, } +/// Create a new blank timeline. #[derive(clap::Args)] -#[clap(about = "Create a new blank timeline")] struct TimelineCreateCmdArgs { - #[clap( - long = "tenant-id", - help = "Tenant id. Represented as a hexadecimal string 32 symbols length" - )] + /// Tenant ID, as a 32-byte hexadecimal string. + #[clap(long = "tenant-id")] tenant_id: Option, - - #[clap(long, help = "New timeline's ID")] + /// New timeline's ID, as a 32-byte hexadecimal string. + #[clap(long)] timeline_id: Option, - - #[clap(long, help = "Human-readable alias for the new timeline")] + /// Human-readable alias for the new timeline. + #[clap(long)] branch_name: String, + /// Postgres version. #[arg(default_value = DEFAULT_PG_VERSION_NUM)] - #[clap(long, help = "Postgres version")] + #[clap(long)] pg_version: PgMajorVersion, } +/// Import a timeline from a basebackup directory. #[derive(clap::Args)] -#[clap(about = "Import timeline from a basebackup directory")] struct TimelineImportCmdArgs { - #[clap( - long = "tenant-id", - help = "Tenant id. Represented as a hexadecimal string 32 symbols length" - )] + /// Tenant ID, as a 32-byte hexadecimal string. + #[clap(long = "tenant-id")] tenant_id: Option, - - #[clap(long, help = "New timeline's ID")] + /// New timeline's ID, as a 32-byte hexadecimal string. + #[clap(long)] timeline_id: TimelineId, - - #[clap(long, help = "Human-readable alias for the new timeline")] + /// Human-readable alias for the new timeline. + #[clap(long)] branch_name: String, - - #[clap(long, help = "Basebackup tarfile to import")] + /// Basebackup tarfile to import. + #[clap(long)] base_tarfile: PathBuf, - - #[clap(long, help = "Lsn the basebackup starts at")] + /// LSN the basebackup starts at. + #[clap(long)] base_lsn: Lsn, - - #[clap(long, help = "Wal to add after base")] + /// WAL to add after base. + #[clap(long)] wal_tarfile: Option, - - #[clap(long, help = "Lsn the basebackup ends at")] + /// LSN the basebackup ends at. + #[clap(long)] end_lsn: Option, + /// Postgres version of the basebackup being imported. #[arg(default_value = DEFAULT_PG_VERSION_NUM)] - #[clap(long, help = "Postgres version of the backup being imported")] + #[clap(long)] pg_version: PgMajorVersion, } +/// Manage pageservers. #[derive(clap::Subcommand)] -#[clap(about = "Manage pageservers")] enum PageserverCmd { Status(PageserverStatusCmdArgs), Start(PageserverStartCmdArgs), @@ -339,223 +319,202 @@ enum PageserverCmd { Restart(PageserverRestartCmdArgs), } +/// Show status of a local pageserver. #[derive(clap::Args)] -#[clap(about = "Show status of a local pageserver")] struct PageserverStatusCmdArgs { - #[clap(long = "id", help = "pageserver id")] + /// Pageserver ID. + #[clap(long = "id")] pageserver_id: Option, } +/// Start local pageserver. #[derive(clap::Args)] -#[clap(about = "Start local pageserver")] struct PageserverStartCmdArgs { - #[clap(long = "id", help = "pageserver id")] + /// Pageserver ID. + #[clap(long = "id")] pageserver_id: Option, - - #[clap(short = 't', long, help = "timeout until we fail the command")] + /// Timeout until we fail the command. + #[clap(short = 't', long)] #[arg(default_value = "10s")] start_timeout: humantime::Duration, } +/// Stop local pageserver. #[derive(clap::Args)] -#[clap(about = "Stop local pageserver")] struct PageserverStopCmdArgs { - #[clap(long = "id", help = "pageserver id")] + /// Pageserver ID. + #[clap(long = "id")] pageserver_id: Option, - - #[clap( - short = 'm', - help = "If 'immediate', don't flush repository data at shutdown" - )] + /// If 'immediate', don't flush repository data at shutdown + #[clap(short = 'm')] #[arg(value_enum, default_value = "fast")] stop_mode: StopMode, } +/// Restart local pageserver. #[derive(clap::Args)] -#[clap(about = "Restart local pageserver")] struct PageserverRestartCmdArgs { - #[clap(long = "id", help = "pageserver id")] + /// Pageserver ID. + #[clap(long = "id")] pageserver_id: Option, - - #[clap(short = 't', long, help = "timeout until we fail the command")] + /// Timeout until we fail the command. + #[clap(short = 't', long)] #[arg(default_value = "10s")] start_timeout: humantime::Duration, } +/// Manage storage controller. #[derive(clap::Subcommand)] -#[clap(about = "Manage storage controller")] enum StorageControllerCmd { Start(StorageControllerStartCmdArgs), Stop(StorageControllerStopCmdArgs), } +/// Start storage controller. #[derive(clap::Args)] -#[clap(about = "Start storage controller")] struct StorageControllerStartCmdArgs { - #[clap(short = 't', long, help = "timeout until we fail the command")] + /// Timeout until we fail the command. + #[clap(short = 't', long)] #[arg(default_value = "10s")] start_timeout: humantime::Duration, - - #[clap( - long, - help = "Identifier used to distinguish storage controller instances" - )] + /// Identifier used to distinguish storage controller instances. + #[clap(long)] #[arg(default_value_t = 1)] instance_id: u8, - - #[clap( - long, - help = "Base port for the storage controller instance idenfified by instance-id (defaults to pageserver cplane api)" - )] + /// Base port for the storage controller instance identified by instance-id (defaults to + /// pageserver cplane api). + #[clap(long)] base_port: Option, - #[clap( - long, - help = "Whether the storage controller should handle pageserver-reported local disk loss events." - )] + /// Whether the storage controller should handle pageserver-reported local disk loss events. + #[clap(long)] handle_ps_local_disk_loss: Option, } +/// Stop storage controller. #[derive(clap::Args)] -#[clap(about = "Stop storage controller")] struct StorageControllerStopCmdArgs { - #[clap( - short = 'm', - help = "If 'immediate', don't flush repository data at shutdown" - )] + /// If 'immediate', don't flush repository data at shutdown + #[clap(short = 'm')] #[arg(value_enum, default_value = "fast")] stop_mode: StopMode, - - #[clap( - long, - help = "Identifier used to distinguish storage controller instances" - )] + /// Identifier used to distinguish storage controller instances. + #[clap(long)] #[arg(default_value_t = 1)] instance_id: u8, } +/// Manage storage broker. #[derive(clap::Subcommand)] -#[clap(about = "Manage storage broker")] enum StorageBrokerCmd { Start(StorageBrokerStartCmdArgs), Stop(StorageBrokerStopCmdArgs), } +/// Start broker. #[derive(clap::Args)] -#[clap(about = "Start broker")] struct StorageBrokerStartCmdArgs { - #[clap(short = 't', long, help = "timeout until we fail the command")] - #[arg(default_value = "10s")] + /// Timeout until we fail the command. + #[clap(short = 't', long, default_value = "10s")] start_timeout: humantime::Duration, } +/// Stop broker. #[derive(clap::Args)] -#[clap(about = "stop broker")] struct StorageBrokerStopCmdArgs { - #[clap( - short = 'm', - help = "If 'immediate', don't flush repository data at shutdown" - )] + /// If 'immediate', don't flush repository data on shutdown. + #[clap(short = 'm')] #[arg(value_enum, default_value = "fast")] stop_mode: StopMode, } +/// Manage safekeepers. #[derive(clap::Subcommand)] -#[clap(about = "Manage safekeepers")] enum SafekeeperCmd { Start(SafekeeperStartCmdArgs), Stop(SafekeeperStopCmdArgs), Restart(SafekeeperRestartCmdArgs), } +/// Manage object storage. #[derive(clap::Subcommand)] -#[clap(about = "Manage object storage")] enum EndpointStorageCmd { Start(EndpointStorageStartCmd), Stop(EndpointStorageStopCmd), } +/// Start object storage. #[derive(clap::Args)] -#[clap(about = "Start object storage")] struct EndpointStorageStartCmd { - #[clap(short = 't', long, help = "timeout until we fail the command")] + /// Timeout until we fail the command. + #[clap(short = 't', long)] #[arg(default_value = "10s")] start_timeout: humantime::Duration, } +/// Stop object storage. #[derive(clap::Args)] -#[clap(about = "Stop object storage")] struct EndpointStorageStopCmd { + /// If 'immediate', don't flush repository data on shutdown. + #[clap(short = 'm')] #[arg(value_enum, default_value = "fast")] - #[clap( - short = 'm', - help = "If 'immediate', don't flush repository data at shutdown" - )] stop_mode: StopMode, } +/// Start local safekeeper. #[derive(clap::Args)] -#[clap(about = "Start local safekeeper")] struct SafekeeperStartCmdArgs { - #[clap(help = "safekeeper id")] + /// Safekeeper ID. #[arg(default_value_t = NodeId(1))] id: NodeId, - #[clap( - short = 'e', - long = "safekeeper-extra-opt", - help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo" - )] + /// Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo. + #[clap(short = 'e', long = "safekeeper-extra-opt")] extra_opt: Vec, - #[clap(short = 't', long, help = "timeout until we fail the command")] + /// Timeout until we fail the command. + #[clap(short = 't', long)] #[arg(default_value = "10s")] start_timeout: humantime::Duration, } +/// Stop local safekeeper. #[derive(clap::Args)] -#[clap(about = "Stop local safekeeper")] struct SafekeeperStopCmdArgs { - #[clap(help = "safekeeper id")] + /// Safekeeper ID. #[arg(default_value_t = NodeId(1))] id: NodeId, + /// If 'immediate', don't flush repository data on shutdown. #[arg(value_enum, default_value = "fast")] - #[clap( - short = 'm', - help = "If 'immediate', don't flush repository data at shutdown" - )] + #[clap(short = 'm')] stop_mode: StopMode, } +/// Restart local safekeeper. #[derive(clap::Args)] -#[clap(about = "Restart local safekeeper")] struct SafekeeperRestartCmdArgs { - #[clap(help = "safekeeper id")] + /// Safekeeper ID. #[arg(default_value_t = NodeId(1))] id: NodeId, + /// If 'immediate', don't flush repository data on shutdown. #[arg(value_enum, default_value = "fast")] - #[clap( - short = 'm', - help = "If 'immediate', don't flush repository data at shutdown" - )] + #[clap(short = 'm')] stop_mode: StopMode, - #[clap( - short = 'e', - long = "safekeeper-extra-opt", - help = "Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo" - )] + /// Additional safekeeper invocation options, e.g. -e=--http-auth-public-key-path=foo. + #[clap(short = 'e', long = "safekeeper-extra-opt")] extra_opt: Vec, - #[clap(short = 't', long, help = "timeout until we fail the command")] + /// Timeout until we fail the command. + #[clap(short = 't', long)] #[arg(default_value = "10s")] start_timeout: humantime::Duration, } +/// Manage Postgres instances. #[derive(clap::Subcommand)] -#[clap(about = "Manage Postgres instances")] enum EndpointCmd { List(EndpointListCmdArgs), Create(EndpointCreateCmdArgs), @@ -567,33 +526,27 @@ enum EndpointCmd { GenerateJwt(EndpointGenerateJwtCmdArgs), } +/// List endpoints. #[derive(clap::Args)] -#[clap(about = "List endpoints")] struct EndpointListCmdArgs { - #[clap( - long = "tenant-id", - help = "Tenant id. Represented as a hexadecimal string 32 symbols length" - )] + /// Tenant ID, as a 32-byte hexadecimal string. + #[clap(long = "tenant-id")] tenant_shard_id: Option, } +/// Create a compute endpoint. #[derive(clap::Args)] -#[clap(about = "Create a compute endpoint")] struct EndpointCreateCmdArgs { - #[clap( - long = "tenant-id", - help = "Tenant id. Represented as a hexadecimal string 32 symbols length" - )] + /// Tenant ID, as a 32-byte hexadecimal string. + #[clap(long = "tenant-id")] tenant_id: Option, - - #[clap(help = "Postgres endpoint id")] + /// Postgres endpoint ID. endpoint_id: Option, - #[clap(long, help = "Name of the branch the endpoint will run on")] + /// Name of the branch the endpoint will run on. + #[clap(long)] branch_name: Option, - #[clap( - long, - help = "Specify Lsn on the timeline to start from. By default, end of the timeline would be used" - )] + /// Specify LSN on the timeline to start from. By default, end of the timeline would be used. + #[clap(long)] lsn: Option, #[clap(long)] pg_port: Option, @@ -604,16 +557,13 @@ struct EndpointCreateCmdArgs { #[clap(long = "pageserver-id")] endpoint_pageserver_id: Option, - #[clap( - long, - help = "Don't do basebackup, create endpoint directory with only config files", - action = clap::ArgAction::Set, - default_value_t = false - )] + /// Don't do basebackup, create endpoint directory with only config files. + #[clap(long, action = clap::ArgAction::Set, default_value_t = false)] config_only: bool, + /// Postgres version. #[arg(default_value = DEFAULT_PG_VERSION_NUM)] - #[clap(long, help = "Postgres version")] + #[clap(long)] pg_version: PgMajorVersion, /// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings. @@ -624,170 +574,140 @@ struct EndpointCreateCmdArgs { #[clap(long)] grpc: bool, - #[clap( - long, - help = "If set, the node will be a hot replica on the specified timeline", - action = clap::ArgAction::Set, - default_value_t = false - )] + /// If set, the node will be a hot replica on the specified timeline. + #[clap(long, action = clap::ArgAction::Set, default_value_t = false)] hot_standby: bool, - - #[clap(long, help = "If set, will set up the catalog for neon_superuser")] + /// If set, will set up the catalog for neon_superuser. + #[clap(long)] update_catalog: bool, - - #[clap( - long, - help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests." - )] + /// Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but + /// useful for tests. + #[clap(long)] allow_multiple: bool, - /// Only allow changing it on creation - #[clap(long, help = "Name of the privileged role for the endpoint")] + /// Name of the privileged role for the endpoint. + // Only allow changing it on creation. + #[clap(long)] privileged_role_name: Option, } +/// Start Postgres. If the endpoint doesn't exist yet, it is created. #[derive(clap::Args)] -#[clap(about = "Start postgres. If the endpoint doesn't exist yet, it is created.")] struct EndpointStartCmdArgs { - #[clap(help = "Postgres endpoint id")] + /// Postgres endpoint ID. endpoint_id: String, + /// Pageserver ID. #[clap(long = "pageserver-id")] endpoint_pageserver_id: Option, - - #[clap( - long, - help = "Safekeepers membership generation to prefix neon.safekeepers with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations." - )] + /// Safekeepers membership generation to prefix neon.safekeepers with. + #[clap(long)] safekeepers_generation: Option, - #[clap( - long, - help = "List of safekeepers endpoint will talk to. Normally neon_local chooses them on its own, but this option allows to override." - )] + /// List of safekeepers endpoint will talk to. + #[clap(long)] safekeepers: Option, - - #[clap( - long, - help = "Configure the remote extensions storage proxy gateway URL to request for extensions.", - alias = "remote-ext-config" - )] + /// Configure the remote extensions storage proxy gateway URL to request for extensions. + #[clap(long, alias = "remote-ext-config")] remote_ext_base_url: Option, - - #[clap( - long, - help = "If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`" - )] + /// If set, will create test user `user` and `neondb` database. Requires `update-catalog = true` + #[clap(long)] create_test_user: bool, - - #[clap( - long, - help = "Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests." - )] + /// Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but + /// useful for tests. + #[clap(long)] allow_multiple: bool, - - #[clap(short = 't', long, value_parser= humantime::parse_duration, help = "timeout until we fail the command")] + /// Timeout until we fail the command. + #[clap(short = 't', long, value_parser= humantime::parse_duration)] #[arg(default_value = "90s")] start_timeout: Duration, - #[clap( - long, - help = "Download LFC cache from endpoint storage on endpoint startup", - default_value = "false" - )] + /// Download LFC cache from endpoint storage on endpoint startup + #[clap(long, default_value = "false")] autoprewarm: bool, - #[clap(long, help = "Upload LFC cache to endpoint storage periodically")] + /// Upload LFC cache to endpoint storage periodically + #[clap(long)] offload_lfc_interval_seconds: Option, - #[clap( - long, - help = "Run in development mode, skipping VM-specific operations like process termination", - action = clap::ArgAction::SetTrue - )] + /// Run in development mode, skipping VM-specific operations like process termination + #[clap(long, action = clap::ArgAction::SetTrue)] dev: bool, } +/// Reconfigure an endpoint. #[derive(clap::Args)] -#[clap(about = "Reconfigure an endpoint")] struct EndpointReconfigureCmdArgs { - #[clap( - long = "tenant-id", - help = "Tenant id. Represented as a hexadecimal string 32 symbols length" - )] + /// Tenant id. Represented as a hexadecimal string 32 symbols length + #[clap(long = "tenant-id")] tenant_id: Option, - - #[clap(help = "Postgres endpoint id")] + /// Postgres endpoint ID. endpoint_id: String, + /// Pageserver ID. #[clap(long = "pageserver-id")] endpoint_pageserver_id: Option, - #[clap(long)] safekeepers: Option, } +/// Refresh the endpoint's configuration by forcing it reload it's spec #[derive(clap::Args)] -#[clap(about = "Refresh the endpoint's configuration by forcing it reload it's spec")] struct EndpointRefreshConfigurationArgs { - #[clap(help = "Postgres endpoint id")] + /// Postgres endpoint id endpoint_id: String, } +/// Stop an endpoint. #[derive(clap::Args)] -#[clap(about = "Stop an endpoint")] struct EndpointStopCmdArgs { - #[clap(help = "Postgres endpoint id")] + /// Postgres endpoint ID. endpoint_id: String, - - #[clap( - long, - help = "Also delete data directory (now optional, should be default in future)" - )] + /// Also delete data directory (now optional, should be default in future). + #[clap(long)] destroy: bool, - #[clap(long, help = "Postgres shutdown mode")] + /// Postgres shutdown mode, passed to `pg_ctl -m `. + #[clap(long)] #[clap(default_value = "fast")] mode: EndpointTerminateMode, } +/// Update the pageservers in the spec file of the compute endpoint #[derive(clap::Args)] -#[clap(about = "Update the pageservers in the spec file of the compute endpoint")] struct EndpointUpdatePageserversCmdArgs { - #[clap(help = "Postgres endpoint id")] + /// Postgres endpoint id endpoint_id: String, - #[clap(short = 'p', long, help = "Specified pageserver id")] + /// Specified pageserver id + #[clap(short = 'p', long)] pageserver_id: Option, } +/// Generate a JWT for an endpoint. #[derive(clap::Args)] -#[clap(about = "Generate a JWT for an endpoint")] struct EndpointGenerateJwtCmdArgs { - #[clap(help = "Postgres endpoint id")] + /// Postgres endpoint ID. endpoint_id: String, - - #[clap(short = 's', long, help = "Scope to generate the JWT with", value_parser = ComputeClaimsScope::from_str)] + /// Scope to generate the JWT with. + #[clap(short = 's', long, value_parser = ComputeClaimsScope::from_str)] scope: Option, } +/// Manage neon_local branch name mappings. #[derive(clap::Subcommand)] -#[clap(about = "Manage neon_local branch name mappings")] enum MappingsCmd { Map(MappingsMapCmdArgs), } +/// Create new mapping which cannot exist already. #[derive(clap::Args)] -#[clap(about = "Create new mapping which cannot exist already")] struct MappingsMapCmdArgs { - #[clap( - long, - help = "Tenant id. Represented as a hexadecimal string 32 symbols length" - )] + /// Tenant ID, as a 32-byte hexadecimal string. + #[clap(long)] tenant_id: TenantId, - #[clap( - long, - help = "Timeline id. Represented as a hexadecimal string 32 symbols length" - )] + /// Timeline ID, as a 32-byte hexadecimal string. + #[clap(long)] timeline_id: TimelineId, - #[clap(long, help = "Branch name to give to the timeline")] + /// Branch name to give to the timeline. + #[clap(long)] branch_name: String, }