From d1b60fa0b69dde210ec449062b0565cb4c1889a8 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 11 Mar 2025 10:48:50 +0000 Subject: [PATCH 1/9] fix(proxy): delete prepared statements when discarding (#11165) Fixes https://github.com/neondatabase/serverless/issues/144 When tables have enums, we need to perform type queries for that data. We cache these query statements for performance reasons. In Neon RLS, we run "discard all" for security reasons, which discards all the statements. When we need to type check again, the statements are no longer valid. This fixes it to discard the statements as well. I've also added some new logs and error types to monitor this. Currently we don't see the prepared statement errors in our logs. --- libs/proxy/tokio-postgres2/src/client.rs | 12 +++ proxy/src/serverless/local_conn_pool.rs | 14 +++- proxy/src/serverless/sql_over_http.rs | 95 ++++++++++++++++-------- 3 files changed, 87 insertions(+), 34 deletions(-) diff --git a/libs/proxy/tokio-postgres2/src/client.rs b/libs/proxy/tokio-postgres2/src/client.rs index c70cb598de..08a06163e1 100644 --- a/libs/proxy/tokio-postgres2/src/client.rs +++ b/libs/proxy/tokio-postgres2/src/client.rs @@ -284,6 +284,18 @@ impl Client { simple_query::batch_execute(self.inner(), query).await } + pub async fn discard_all(&self) -> Result { + // clear the prepared statements that are about to be nuked from the postgres session + { + let mut typeinfo = self.inner.cached_typeinfo.lock(); + typeinfo.typeinfo = None; + typeinfo.typeinfo_composite = None; + typeinfo.typeinfo_enum = None; + } + + self.batch_execute("discard all").await + } + /// Begins a new database transaction. /// /// The transaction will roll back by default - use the `commit` method to commit it. diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index 8426a0810e..c958d077fc 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -35,6 +35,7 @@ use super::conn_pool_lib::{ Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, DbUserConn, EndpointConnPool, }; +use super::sql_over_http::SqlOverHttpError; use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::Metrics; @@ -274,18 +275,23 @@ pub(crate) fn poll_client( } impl ClientInnerCommon { - pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), HttpConnError> { + pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), SqlOverHttpError> { if let ClientDataEnum::Local(local_data) = &mut self.data { local_data.jti += 1; let token = resign_jwt(&local_data.key, payload, local_data.jti)?; - // discard all cannot run in a transaction. must be executed alone. - self.inner.batch_execute("discard all").await?; + self.inner + .discard_all() + .await + .map_err(SqlOverHttpError::InternalPostgres)?; // initiates the auth session // this is safe from query injections as the jwt format free of any escape characters. let query = format!("select auth.jwt_session_init('{token}')"); - self.inner.batch_execute(&query).await?; + self.inner + .batch_execute(&query) + .await + .map_err(SqlOverHttpError::InternalPostgres)?; let pid = self.inner.get_process_id(); info!(pid, jti = local_data.jti, "user session state init"); diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 93dd531f70..612702231f 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -412,8 +412,12 @@ pub(crate) enum SqlOverHttpError { ResponseTooLarge(usize), #[error("invalid isolation level")] InvalidIsolationLevel, + /// for queries our customers choose to run #[error("{0}")] - Postgres(#[from] postgres_client::Error), + Postgres(#[source] postgres_client::Error), + /// for queries we choose to run + #[error("{0}")] + InternalPostgres(#[source] postgres_client::Error), #[error("{0}")] JsonConversion(#[from] JsonConversionError), #[error("{0}")] @@ -429,6 +433,13 @@ impl ReportableError for SqlOverHttpError { SqlOverHttpError::ResponseTooLarge(_) => ErrorKind::User, SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User, SqlOverHttpError::Postgres(p) => p.get_error_kind(), + SqlOverHttpError::InternalPostgres(p) => { + if p.as_db_error().is_some() { + ErrorKind::Service + } else { + ErrorKind::Compute + } + } SqlOverHttpError::JsonConversion(_) => ErrorKind::Postgres, SqlOverHttpError::Cancelled(c) => c.get_error_kind(), } @@ -444,6 +455,7 @@ impl UserFacingError for SqlOverHttpError { SqlOverHttpError::ResponseTooLarge(_) => self.to_string(), SqlOverHttpError::InvalidIsolationLevel => self.to_string(), SqlOverHttpError::Postgres(p) => p.to_string(), + SqlOverHttpError::InternalPostgres(p) => p.to_string(), SqlOverHttpError::JsonConversion(_) => "could not parse postgres response".to_string(), SqlOverHttpError::Cancelled(_) => self.to_string(), } @@ -462,6 +474,7 @@ impl HttpCodeError for SqlOverHttpError { SqlOverHttpError::ResponseTooLarge(_) => StatusCode::INSUFFICIENT_STORAGE, SqlOverHttpError::InvalidIsolationLevel => StatusCode::BAD_REQUEST, SqlOverHttpError::Postgres(_) => StatusCode::BAD_REQUEST, + SqlOverHttpError::InternalPostgres(_) => StatusCode::INTERNAL_SERVER_ERROR, SqlOverHttpError::JsonConversion(_) => StatusCode::INTERNAL_SERVER_ERROR, SqlOverHttpError::Cancelled(_) => StatusCode::INTERNAL_SERVER_ERROR, } @@ -671,16 +684,14 @@ async fn handle_db_inner( let authenticate_and_connect = Box::pin( async { let keys = match auth { - AuthData::Password(pw) => { - backend - .authenticate_with_password(ctx, &conn_info.user_info, &pw) - .await? - } - AuthData::Jwt(jwt) => { - backend - .authenticate_with_jwt(ctx, &conn_info.user_info, jwt) - .await? - } + AuthData::Password(pw) => backend + .authenticate_with_password(ctx, &conn_info.user_info, &pw) + .await + .map_err(HttpConnError::AuthError)?, + AuthData::Jwt(jwt) => backend + .authenticate_with_jwt(ctx, &conn_info.user_info, jwt) + .await + .map_err(HttpConnError::AuthError)?, }; let client = match keys.keys { @@ -703,7 +714,7 @@ async fn handle_db_inner( // not strictly necessary to mark success here, // but it's just insurance for if we forget it somewhere else ctx.success(); - Ok::<_, HttpConnError>(client) + Ok::<_, SqlOverHttpError>(client) } .map_err(SqlOverHttpError::from), ); @@ -933,11 +944,15 @@ impl BatchQueryData { builder = builder.deferrable(true); } - let transaction = builder.start().await.inspect_err(|_| { - // if we cannot start a transaction, we should return immediately - // and not return to the pool. connection is clearly broken - discard.discard(); - })?; + let transaction = builder + .start() + .await + .inspect_err(|_| { + // if we cannot start a transaction, we should return immediately + // and not return to the pool. connection is clearly broken + discard.discard(); + }) + .map_err(SqlOverHttpError::Postgres)?; let json_output = match query_batch( config, @@ -950,11 +965,15 @@ impl BatchQueryData { { Ok(json_output) => { info!("commit"); - let status = transaction.commit().await.inspect_err(|_| { - // if we cannot commit - for now don't return connection to pool - // TODO: get a query status from the error - discard.discard(); - })?; + let status = transaction + .commit() + .await + .inspect_err(|_| { + // if we cannot commit - for now don't return connection to pool + // TODO: get a query status from the error + discard.discard(); + }) + .map_err(SqlOverHttpError::Postgres)?; discard.check_idle(status); json_output } @@ -969,11 +988,15 @@ impl BatchQueryData { } Err(err) => { info!("rollback"); - let status = transaction.rollback().await.inspect_err(|_| { - // if we cannot rollback - for now don't return connection to pool - // TODO: get a query status from the error - discard.discard(); - })?; + let status = transaction + .rollback() + .await + .inspect_err(|_| { + // if we cannot rollback - for now don't return connection to pool + // TODO: get a query status from the error + discard.discard(); + }) + .map_err(SqlOverHttpError::Postgres)?; discard.check_idle(status); return Err(err); } @@ -1032,7 +1055,12 @@ async fn query_to_json( let query_start = Instant::now(); let query_params = data.params; - let mut row_stream = std::pin::pin!(client.query_raw_txt(&data.query, query_params).await?); + let mut row_stream = std::pin::pin!( + client + .query_raw_txt(&data.query, query_params) + .await + .map_err(SqlOverHttpError::Postgres)? + ); let query_acknowledged = Instant::now(); // Manually drain the stream into a vector to leave row_stream hanging @@ -1040,7 +1068,7 @@ async fn query_to_json( // big. let mut rows: Vec = Vec::new(); while let Some(row) = row_stream.next().await { - let row = row?; + let row = row.map_err(SqlOverHttpError::Postgres)?; *current_size += row.body_len(); rows.push(row); // we don't have a streaming response support yet so this is to prevent OOM @@ -1091,7 +1119,14 @@ async fn query_to_json( "dataTypeModifier": c.type_modifier(), "format": "text", })); - columns.push(client.get_type(c.type_oid()).await?); + + match client.get_type(c.type_oid()).await { + Ok(t) => columns.push(t), + Err(err) => { + tracing::warn!(?err, "unable to query type information"); + return Err(SqlOverHttpError::InternalPostgres(err)); + } + } } let array_mode = data.array_mode.unwrap_or(parsed_headers.default_array_mode); From f466c0199581cfdad9c953c34a1f6bb0b40de78e Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 11 Mar 2025 12:43:55 +0100 Subject: [PATCH 2/9] pageserver: add `max_logical_size_per_shard` for `get_top_tenants` (#11157) ## Problem In #11122, we want to split shards once the logical size of the largest timeline exceeds a split threshold. However, `get_top_tenants` currently only returns `max_logical_size`, which tracks the max _total_ logical size of a timeline across all shards. This is problematic, because the storage controller needs to fetch a list of N tenants that are eligible for splits, but the API doesn't currently have a way to express this. For example, with a split threshold of 1 GB, a tenant with `max_logical_size` of 4 GB is eligible to split if it has 1 or 2 shards, but not if it already has 4 shards. We need to express this in per-shard terms, otherwise the `get_top_tenants` endpoint may end up only returning tenants that can't be split, blocking splits entirely. Touches https://github.com/neondatabase/neon/pull/11122. Touches https://github.com/neondatabase/cloud/issues/22532. ## Summary of changes Add `TenantShardItem::max_logical_size_per_shard` containing `max_logical_size / shard_count`, and `TenantSorting::MaxLogicalSizePerShard` to order and filter by it. --- libs/pageserver_api/src/models.rs | 18 +++++++++++++++--- pageserver/src/http/routes.rs | 1 + pageserver/src/tenant.rs | 5 +++++ 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 13a9b5d89e..b1ebad83b1 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1476,8 +1476,14 @@ pub struct TenantScanRemoteStorageResponse { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "snake_case")] pub enum TenantSorting { + /// Total size of layers on local disk for all timelines in a shard. ResidentSize, + /// The logical size of the largest timeline within a _tenant_ (not shard). Only tracked on + /// shard 0, contains the sum across all shards. MaxLogicalSize, + /// The logical size of the largest timeline within a _tenant_ (not shard), divided by number of + /// shards. Only tracked on shard 0, and estimates the per-shard logical size. + MaxLogicalSizePerShard, } impl Default for TenantSorting { @@ -1507,14 +1513,20 @@ pub struct TopTenantShardsRequest { pub struct TopTenantShardItem { pub id: TenantShardId, - /// Total size of layers on local disk for all timelines in this tenant + /// Total size of layers on local disk for all timelines in this shard. pub resident_size: u64, - /// Total size of layers in remote storage for all timelines in this tenant + /// Total size of layers in remote storage for all timelines in this shard. pub physical_size: u64, - /// The largest logical size of a timeline within this tenant + /// The largest logical size of a timeline within this _tenant_ (not shard). This is only + /// tracked on shard 0, and contains the sum of the logical size across all shards. pub max_logical_size: u64, + + /// The largest logical size of a timeline within this _tenant_ (not shard) divided by number of + /// shards. This is only tracked on shard 0, and is only an estimate as we divide it evenly by + /// shard count, rounded up. + pub max_logical_size_per_shard: u64, } #[derive(Serialize, Deserialize, Debug, Default)] diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 77bfab47e0..e5848bfd25 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3223,6 +3223,7 @@ async fn post_top_tenants( match order_by { TenantSorting::ResidentSize => sizes.resident_size, TenantSorting::MaxLogicalSize => sizes.max_logical_size, + TenantSorting::MaxLogicalSizePerShard => sizes.max_logical_size_per_shard, } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 3a34c8e254..62e1cdac0c 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3842,6 +3842,7 @@ impl Tenant { resident_size: 0, physical_size: 0, max_logical_size: 0, + max_logical_size_per_shard: 0, }; for timeline in self.timelines.lock().unwrap().values() { @@ -3858,6 +3859,10 @@ impl Tenant { ); } + result.max_logical_size_per_shard = result + .max_logical_size + .div_ceil(self.tenant_shard_id.shard_count.count() as u64); + result } } From 359c64c7797887694d7d0b5745bc2f99b34ac5ac Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 11 Mar 2025 17:01:00 +0300 Subject: [PATCH 3/9] walproposer: pre generations refactoring (#11060) ## Problem https://github.com/neondatabase/neon/issues/10851 ## Summary of changes Do some refactoring before making walproposer generations aware. - Rename SS_VOTING to SS_WAIT_VOTING, SS_IDLE to SS_WAIT_ELECTED - Continue to get rid of epochs: rename GetEpoch to GetLastLogTerm, donorEpoch to donorLastLogTerm - Instead of counting n_votes, n_connected, introduce explicit WalProposerState (collecting terms / voting / elected). Refactor out TermsCollected and VotesCollected; they will determine state transition differently depending whether generations are enabled or not. There is no new logic in this PR and thus no new tests. --- pgxn/neon/walproposer.c | 257 ++++++++++-------- pgxn/neon/walproposer.h | 22 +- pgxn/neon/walproposer_pg.c | 2 +- .../tests/walproposer_sim/walproposer_api.rs | 7 +- 4 files changed, 157 insertions(+), 131 deletions(-) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 7ec4ec99fc..0336d63e8d 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -57,10 +57,11 @@ static void SendProposerGreeting(Safekeeper *sk); static void RecvAcceptorGreeting(Safekeeper *sk); static void SendVoteRequest(Safekeeper *sk); static void RecvVoteResponse(Safekeeper *sk); +static bool VotesCollected(WalProposer *wp); static void HandleElectedProposer(WalProposer *wp); static term_t GetHighestTerm(TermHistory *th); -static term_t GetEpoch(Safekeeper *sk); -static void DetermineEpochStartLsn(WalProposer *wp); +static term_t GetLastLogTerm(Safekeeper *sk); +static void ProcessPropStartPos(WalProposer *wp); static void SendProposerElected(Safekeeper *sk); static void StartStreaming(Safekeeper *sk); static void SendMessageToNode(Safekeeper *sk); @@ -97,6 +98,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) wp = palloc0(sizeof(WalProposer)); wp->config = config; wp->api = api; + wp->state = WPS_COLLECTING_TERMS; wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list); @@ -518,7 +520,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) * nodes are transferred from SS_VOTING to sending actual vote * requests. */ - case SS_VOTING: + case SS_WAIT_VOTING: wp_log(WARNING, "EOF from node %s:%s in %s state", sk->host, sk->port, FormatSafekeeperState(sk)); ResetConnection(sk); @@ -547,7 +549,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) /* * Idle state for waiting votes from quorum. */ - case SS_IDLE: + case SS_WAIT_ELECTED: wp_log(WARNING, "EOF from node %s:%s in %s state", sk->host, sk->port, FormatSafekeeperState(sk)); ResetConnection(sk); @@ -721,6 +723,15 @@ SendProposerGreeting(Safekeeper *sk) BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV); } +/* + * Have we received greeting from enough (quorum) safekeepers to start voting? + */ +static bool +TermsCollected(WalProposer *wp) +{ + return wp->n_connected >= wp->quorum; +} + static void RecvAcceptorGreeting(Safekeeper *sk) { @@ -754,7 +765,7 @@ RecvAcceptorGreeting(Safekeeper *sk) } /* Protocol is all good, move to voting. */ - sk->state = SS_VOTING; + sk->state = SS_WAIT_VOTING; /* * Note: it would be better to track the counter on per safekeeper basis, @@ -762,17 +773,18 @@ RecvAcceptorGreeting(Safekeeper *sk) * as is for now. */ ++wp->n_connected; - if (wp->n_connected <= wp->quorum) + if (wp->state == WPS_COLLECTING_TERMS) { /* We're still collecting terms from the majority. */ wp->propTerm = Max(sk->greetResponse.term, wp->propTerm); /* Quorum is acquried, prepare the vote request. */ - if (wp->n_connected == wp->quorum) + if (TermsCollected(wp)) { wp->propTerm++; wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm); + wp->state = WPS_CAMPAIGN; wp->voteRequest.pam.tag = 'v'; wp->voteRequest.generation = wp->mconf.generation; wp->voteRequest.term = wp->propTerm; @@ -787,12 +799,10 @@ RecvAcceptorGreeting(Safekeeper *sk) } /* - * Check if we have quorum. If there aren't enough safekeepers, wait and - * do nothing. We'll eventually get a task when the election starts. - * - * If we do have quorum, we can start an election. + * If we have quorum, start (or just send vote request to newly connected + * node) election, otherwise wait until we have more greetings. */ - if (wp->n_connected < wp->quorum) + if (wp->state == WPS_COLLECTING_TERMS) { /* * SS_VOTING is an idle state; read-ready indicates the connection @@ -807,11 +817,7 @@ RecvAcceptorGreeting(Safekeeper *sk) */ for (int j = 0; j < wp->n_safekeepers; j++) { - /* - * Remember: SS_VOTING indicates that the safekeeper is - * participating in voting, but hasn't sent anything yet. - */ - if (wp->safekeeper[j].state == SS_VOTING) + if (wp->safekeeper[j].state == SS_WAIT_VOTING) SendVoteRequest(&wp->safekeeper[j]); } } @@ -838,6 +844,8 @@ RecvVoteResponse(Safekeeper *sk) { WalProposer *wp = sk->wp; + Assert(wp->state >= WPS_CAMPAIGN); + sk->voteResponse.apm.tag = 'v'; if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->voteResponse)) return; @@ -856,7 +864,7 @@ RecvVoteResponse(Safekeeper *sk) * we are not elected yet and thus need the vote. */ if ((!sk->voteResponse.voteGiven) && - (sk->voteResponse.term > wp->propTerm || wp->n_votes < wp->quorum)) + (sk->voteResponse.term > wp->propTerm || wp->state == WPS_CAMPAIGN)) { wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", sk->host, sk->port, @@ -864,38 +872,83 @@ RecvVoteResponse(Safekeeper *sk) } Assert(sk->voteResponse.term == wp->propTerm); - /* Handshake completed, do we have quorum? */ + /* ready for elected message */ + sk->state = SS_WAIT_ELECTED; + wp->n_votes++; - if (wp->n_votes < wp->quorum) + /* Are we already elected? */ + if (wp->state == WPS_CAMPAIGN) { - sk->state = SS_IDLE; /* can't do much yet, no quorum */ - } - else if (wp->n_votes > wp->quorum) - { - /* already elected, start streaming */ - SendProposerElected(sk); + /* no; check if this vote makes us elected */ + if (VotesCollected(wp)) + { + wp->state = WPS_ELECTED; + HandleElectedProposer(wp); + } + else + { + /* can't do much yet, no quorum */ + return; + } } else { - sk->state = SS_IDLE; - /* Idle state waits for read-ready events */ - wp->api.update_event_set(sk, WL_SOCKET_READABLE); - - HandleElectedProposer(sk->wp); + Assert(wp->state == WPS_ELECTED); + /* send elected only to this sk */ + SendProposerElected(sk); } } +/* + * Checks if enough votes has been collected to get elected and if that's the + * case finds the highest vote, setting donor, donorLastLogTerm, + * propTermStartLsn fields. Also sets truncateLsn. + */ +static bool +VotesCollected(WalProposer *wp) +{ + int n_ready = 0; + + /* assumed to be called only when not elected yet */ + Assert(wp->state == WPS_CAMPAIGN); + + wp->propTermStartLsn = InvalidXLogRecPtr; + wp->donorLastLogTerm = 0; + wp->truncateLsn = InvalidXLogRecPtr; + + for (int i = 0; i < wp->n_safekeepers; i++) + { + if (wp->safekeeper[i].state == SS_WAIT_ELECTED) + { + n_ready++; + + if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm || + (GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm && + wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn)) + { + wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]); + wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn; + wp->donor = i; + } + wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn); + } + } + + return n_ready >= wp->quorum; +} + /* * Called once a majority of acceptors have voted for us and current proposer * has been elected. * - * Sends ProposerElected message to all acceptors in SS_IDLE state and starts + * Sends ProposerElected message to all acceptors in SS_WAIT_ELECTED state and starts * replication from walsender. */ static void HandleElectedProposer(WalProposer *wp) { - DetermineEpochStartLsn(wp); + ProcessPropStartPos(wp); + Assert(wp->propTermStartLsn != InvalidXLogRecPtr); /* * Synchronously download WAL from the most advanced safekeeper. We do @@ -907,40 +960,24 @@ HandleElectedProposer(WalProposer *wp) wp_log(FATAL, "failed to download WAL for logical replicaiton"); } - /* - * Zero propEpochStartLsn means majority of safekeepers doesn't have any - * WAL, timeline was just created. Compute bumps it to basebackup LSN, - * otherwise we must be sync-safekeepers and we have nothing to do then. - * - * Proceeding is not only pointless but harmful, because we'd give - * safekeepers term history starting with 0/0. These hacks will go away - * once we disable implicit timeline creation on safekeepers and create it - * with non zero LSN from the start. - */ - if (wp->propEpochStartLsn == InvalidXLogRecPtr) - { - Assert(wp->config->syncSafekeepers); - wp_log(LOG, "elected with zero propEpochStartLsn in sync-safekeepers, exiting"); - wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn); - } - - if (wp->truncateLsn == wp->propEpochStartLsn && wp->config->syncSafekeepers) + if (wp->truncateLsn == wp->propTermStartLsn && wp->config->syncSafekeepers) { /* Sync is not needed: just exit */ - wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn); + wp->api.finish_sync_safekeepers(wp, wp->propTermStartLsn); /* unreachable */ } for (int i = 0; i < wp->n_safekeepers; i++) { - if (wp->safekeeper[i].state == SS_IDLE) + if (wp->safekeeper[i].state == SS_WAIT_ELECTED) SendProposerElected(&wp->safekeeper[i]); } /* * The proposer has been elected, and there will be no quorum waiting - * after this point. There will be no safekeeper with state SS_IDLE also, - * because that state is used only for quorum waiting. + * after this point. There will be no safekeeper with state + * SS_WAIT_ELECTED also, because that state is used only for quorum + * waiting. */ if (wp->config->syncSafekeepers) @@ -957,7 +994,7 @@ HandleElectedProposer(WalProposer *wp) return; } - wp->api.start_streaming(wp, wp->propEpochStartLsn); + wp->api.start_streaming(wp, wp->propTermStartLsn); /* Should not return here */ } @@ -970,7 +1007,7 @@ GetHighestTerm(TermHistory *th) /* safekeeper's epoch is the term of the highest entry in the log */ static term_t -GetEpoch(Safekeeper *sk) +GetLastLogTerm(Safekeeper *sk) { return GetHighestTerm(&sk->voteResponse.termHistory); } @@ -991,72 +1028,52 @@ SkipXLogPageHeader(WalProposer *wp, XLogRecPtr lsn) } /* - * Called after majority of acceptors gave votes, it calculates the most - * advanced safekeeper (who will be the donor) and epochStartLsn -- LSN since - * which we'll write WAL in our term. - * - * Sets truncateLsn along the way (though it is not of much use at this point -- - * only for skipping recovery). + * Called after quorum gave votes and proposer starting position (highest vote + * term + flush LSN) -- is determined (VotesCollected true), this function + * adopts it: pushes LSN to shmem, sets wp term history, verifies that the + * basebackup matches. */ static void -DetermineEpochStartLsn(WalProposer *wp) +ProcessPropStartPos(WalProposer *wp) { TermHistory *dth; - int n_ready = 0; WalproposerShmemState *walprop_shared; - wp->propEpochStartLsn = InvalidXLogRecPtr; - wp->donorEpoch = 0; - wp->truncateLsn = InvalidXLogRecPtr; - - for (int i = 0; i < wp->n_safekeepers; i++) - { - if (wp->safekeeper[i].state == SS_IDLE) - { - n_ready++; - - if (GetEpoch(&wp->safekeeper[i]) > wp->donorEpoch || - (GetEpoch(&wp->safekeeper[i]) == wp->donorEpoch && - wp->safekeeper[i].voteResponse.flushLsn > wp->propEpochStartLsn)) - { - wp->donorEpoch = GetEpoch(&wp->safekeeper[i]); - wp->propEpochStartLsn = wp->safekeeper[i].voteResponse.flushLsn; - wp->donor = i; - } - wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn); - } - } - - if (n_ready < wp->quorum) - { - /* - * This is a rare case that can be triggered if safekeeper has voted - * and disconnected. In this case, its state will not be SS_IDLE and - * its vote cannot be used, because we clean up `voteResponse` in - * `ShutdownConnection`. - */ - wp_log(FATAL, "missing majority of votes, collected %d, expected %d, got %d", wp->n_votes, wp->quorum, n_ready); - } + /* must have collected votes */ + Assert(wp->state == WPS_ELECTED); /* - * If propEpochStartLsn is 0, it means flushLsn is 0 everywhere, we are - * bootstrapping and nothing was committed yet. Start streaming then from - * the basebackup LSN. + * If propTermStartLsn is 0, it means flushLsn is 0 everywhere, we are + * bootstrapping and nothing was committed yet. Start streaming from the + * basebackup LSN then. + * + * In case of sync-safekeepers just exit: proceeding is not only pointless + * but harmful, because we'd give safekeepers term history starting with + * 0/0. These hacks will go away once we disable implicit timeline + * creation on safekeepers and create it with non zero LSN from the start. */ - if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers) + if (wp->propTermStartLsn == InvalidXLogRecPtr) { - wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp); - wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn)); + if (!wp->config->syncSafekeepers) + { + wp->propTermStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp); + wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propTermStartLsn)); + } + else + { + wp_log(LOG, "elected with zero propTermStartLsn in sync-safekeepers, exiting"); + wp->api.finish_sync_safekeepers(wp, wp->propTermStartLsn); + } } - pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn); + pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propTermStartLsn); Assert(wp->truncateLsn != InvalidXLogRecPtr || wp->config->syncSafekeepers); /* - * We will be generating WAL since propEpochStartLsn, so we should set + * We will be generating WAL since propTermStartLsn, so we should set * availableLsn to mark this LSN as the latest available position. */ - wp->availableLsn = wp->propEpochStartLsn; + wp->availableLsn = wp->propTermStartLsn; /* * Proposer's term history is the donor's + its own entry. @@ -1067,12 +1084,12 @@ DetermineEpochStartLsn(WalProposer *wp) if (dth->n_entries > 0) memcpy(wp->propTermHistory.entries, dth->entries, sizeof(TermSwitchEntry) * dth->n_entries); wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm; - wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propEpochStartLsn; + wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propTermStartLsn; wp_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", wp->quorum, wp->propTerm, - LSN_FORMAT_ARGS(wp->propEpochStartLsn), + LSN_FORMAT_ARGS(wp->propTermStartLsn), wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port, LSN_FORMAT_ARGS(wp->truncateLsn)); @@ -1090,7 +1107,7 @@ DetermineEpochStartLsn(WalProposer *wp) * Safekeepers don't skip header as they need continious stream of * data, so correct LSN for comparison. */ - if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp)) + if (SkipXLogPageHeader(wp, wp->propTermStartLsn) != wp->api.get_redo_start_lsn(wp)) { /* * However, allow to proceed if last_log_term on the node which @@ -1111,8 +1128,8 @@ DetermineEpochStartLsn(WalProposer *wp) */ disable_core_dump(); wp_log(PANIC, - "collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X", - LSN_FORMAT_ARGS(wp->propEpochStartLsn), + "collected propTermStartLsn %X/%X, but basebackup LSN %X/%X", + LSN_FORMAT_ARGS(wp->propTermStartLsn), LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp))); } } @@ -1623,7 +1640,7 @@ GetAcknowledgedByQuorumWALPosition(WalProposer *wp) * Like in Raft, we aren't allowed to commit entries from previous * terms, so ignore reported LSN until it gets to epochStartLsn. */ - responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propEpochStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0; + responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0; } qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn); @@ -1656,10 +1673,10 @@ UpdateDonorShmem(WalProposer *wp) * about its position immediately after election before any feedbacks are * sent. */ - if (wp->safekeeper[wp->donor].state >= SS_IDLE) + if (wp->safekeeper[wp->donor].state >= SS_WAIT_ELECTED) { donor = &wp->safekeeper[wp->donor]; - donor_lsn = wp->propEpochStartLsn; + donor_lsn = wp->propTermStartLsn; } /* @@ -1748,7 +1765,7 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk) for (int i = 0; i < wp->n_safekeepers; i++) { Safekeeper *sk = &wp->safekeeper[i]; - bool synced = sk->appendResponse.commitLsn >= wp->propEpochStartLsn; + bool synced = sk->appendResponse.commitLsn >= wp->propTermStartLsn; /* alive safekeeper which is not synced yet; wait for it */ if (sk->state != SS_OFFLINE && !synced) @@ -1772,7 +1789,7 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk) */ BroadcastAppendRequest(wp); - wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn); + wp->api.finish_sync_safekeepers(wp, wp->propTermStartLsn); /* unreachable */ } } @@ -2378,7 +2395,7 @@ FormatSafekeeperState(Safekeeper *sk) case SS_HANDSHAKE_RECV: return_val = "handshake (receiving)"; break; - case SS_VOTING: + case SS_WAIT_VOTING: return_val = "voting"; break; case SS_WAIT_VERDICT: @@ -2387,7 +2404,7 @@ FormatSafekeeperState(Safekeeper *sk) case SS_SEND_ELECTED_FLUSH: return_val = "send-announcement-flush"; break; - case SS_IDLE: + case SS_WAIT_ELECTED: return_val = "idle"; break; case SS_ACTIVE: @@ -2476,8 +2493,8 @@ SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_even * Idle states use read-readiness as a sign that the connection * has been disconnected. */ - case SS_VOTING: - case SS_IDLE: + case SS_WAIT_VOTING: + case SS_WAIT_ELECTED: *sk_events = WL_SOCKET_READABLE; return; diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 8d1ae26cac..d116bce806 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -73,12 +73,12 @@ typedef enum * Moved externally by execution of SS_HANDSHAKE_RECV, when we received a * quorum of handshakes. */ - SS_VOTING, + SS_WAIT_VOTING, /* * Already sent voting information, waiting to receive confirmation from - * the node. After receiving, moves to SS_IDLE, if the quorum isn't - * reached yet. + * the node. After receiving, moves to SS_WAIT_ELECTED, if the quorum + * isn't reached yet. */ SS_WAIT_VERDICT, @@ -91,7 +91,7 @@ typedef enum * * Moves to SS_ACTIVE only by call to StartStreaming. */ - SS_IDLE, + SS_WAIT_ELECTED, /* * Active phase, when we acquired quorum and have WAL to send or feedback @@ -751,6 +751,15 @@ typedef struct WalProposerConfig #endif } WalProposerConfig; +typedef enum +{ + /* collecting greetings to determine term to campaign for */ + WPS_COLLECTING_TERMS, + /* campaing started, waiting for votes */ + WPS_CAMPAIGN, + /* successfully elected */ + WPS_ELECTED, +} WalProposerState; /* * WAL proposer state. @@ -758,6 +767,7 @@ typedef struct WalProposerConfig typedef struct WalProposer { WalProposerConfig *config; + WalProposerState state; /* Current walproposer membership configuration */ MembershipConfiguration mconf; @@ -813,10 +823,10 @@ typedef struct WalProposer TermHistory propTermHistory; /* epoch start lsn of the proposer */ - XLogRecPtr propEpochStartLsn; + XLogRecPtr propTermStartLsn; /* Most advanced acceptor epoch */ - term_t donorEpoch; + term_t donorLastLogTerm; /* Most advanced acceptor */ int donor; diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index b21184de57..9c34c90002 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1496,7 +1496,7 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk) snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port); Assert(!sk->xlogreader); - sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, log_prefix); + sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propTermStartLsn, log_prefix); if (sk->xlogreader == NULL) wpg_log(FATAL, "failed to allocate xlog reader"); } diff --git a/safekeeper/tests/walproposer_sim/walproposer_api.rs b/safekeeper/tests/walproposer_sim/walproposer_api.rs index 6451589e80..82e7a32881 100644 --- a/safekeeper/tests/walproposer_sim/walproposer_api.rs +++ b/safekeeper/tests/walproposer_sim/walproposer_api.rs @@ -511,8 +511,7 @@ impl ApiImpl for SimulationApi { // collected quorum with lower term, then got rejected by next connected safekeeper executor::exit(1, msg.to_owned()); } - if msg.contains("collected propEpochStartLsn") && msg.contains(", but basebackup LSN ") - { + if msg.contains("collected propTermStartLsn") && msg.contains(", but basebackup LSN ") { // sync-safekeepers collected wrong quorum, walproposer collected another quorum executor::exit(1, msg.to_owned()); } @@ -529,7 +528,7 @@ impl ApiImpl for SimulationApi { } fn after_election(&self, wp: &mut walproposer::bindings::WalProposer) { - let prop_lsn = wp.propEpochStartLsn; + let prop_lsn = wp.propTermStartLsn; let prop_term = wp.propTerm; let mut prev_lsn: u64 = 0; @@ -612,7 +611,7 @@ impl ApiImpl for SimulationApi { sk: &mut walproposer::bindings::Safekeeper, ) -> bool { let mut startpos = wp.truncateLsn; - let endpos = wp.propEpochStartLsn; + let endpos = wp.propTermStartLsn; if startpos == endpos { debug!("recovery_download: nothing to download"); From 7588983168dbc2da7e025684180012e036f9b1b7 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 11 Mar 2025 10:33:35 -0400 Subject: [PATCH 4/9] fix(scrubber): log even if no refs are found (#11160) ## Problem Investigate https://github.com/neondatabase/neon/issues/11159 ## Summary of changes This doesn't fix the issue, but at least we can narrow down the cause next time it happens by logging ancestor referenced layer cnt even if it's 0. Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline/compaction.rs | 3 ++- storage_scrubber/src/pageserver_physical_gc.rs | 12 +++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 42b36f7252..123079804b 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1091,7 +1091,7 @@ impl Timeline { let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn(); tracing::info!( - "latest_gc_cutoff: {}, pitr cutoff {}", + "starting shard ancestor compaction, latest_gc_cutoff: {}, pitr cutoff {}", *latest_gc_cutoff, self.gc_info.read().unwrap().cutoffs.time ); @@ -1120,6 +1120,7 @@ impl Timeline { // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer // should be !is_key_disposable() + // TODO: exclude sparse keyspace from this check, otherwise it will infinitely loop. let range = layer_desc.get_key_range(); let mut key = range.start; while key < range.end { diff --git a/storage_scrubber/src/pageserver_physical_gc.rs b/storage_scrubber/src/pageserver_physical_gc.rs index c956b1abbc..f14341c7bc 100644 --- a/storage_scrubber/src/pageserver_physical_gc.rs +++ b/storage_scrubber/src/pageserver_physical_gc.rs @@ -152,10 +152,8 @@ impl TenantRefAccumulator { } } - if !ancestor_refs.is_empty() { - tracing::info!(%ttid, "Found {} ancestor refs", ancestor_refs.len()); - self.ancestor_ref_shards.update(ttid, ancestor_refs); - } + tracing::info!(%ttid, "Found {} ancestor refs", ancestor_refs.len()); + self.ancestor_ref_shards.update(ttid, ancestor_refs); } /// Consume Self and return a vector of ancestor tenant shards that should be GC'd, and map of referenced ancestor layers to preserve @@ -779,7 +777,7 @@ pub async fn pageserver_physical_gc( let mut summary = GcSummary::default(); { - let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); + let timelines = timelines.try_buffered(CONCURRENCY); let timelines = timelines.try_flatten(); let timelines = timelines.map_ok(|(ttid, tenant_manifest_arc)| { @@ -793,8 +791,8 @@ pub async fn pageserver_physical_gc( tenant_manifest_arc, ) }); - let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); - + let timelines = timelines.try_buffered(CONCURRENCY); + let mut timelines = std::pin::pin!(timelines); // Drain futures for per-shard GC, populating accumulator as a side effect while let Some(i) = timelines.next().await { summary.merge(i?); From 011f7c21a3151c2f88232e718b8b2f701ec94517 Mon Sep 17 00:00:00 2001 From: Ivan Efremov Date: Tue, 11 Mar 2025 19:17:30 +0200 Subject: [PATCH 5/9] fix(proxy): Add testodrome query id HTTP header (#11167) Handle "X-Neon-Query-ID" header to glue data with testodrome queries. Relates to the #22486 --- proxy/src/compute.rs | 2 +- proxy/src/context/mod.rs | 10 ++++++++-- proxy/src/serverless/backend.rs | 9 +++++++++ proxy/src/serverless/mod.rs | 9 +++++++++ 4 files changed, 27 insertions(+), 3 deletions(-) diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 3852bfe348..26254beecf 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -290,7 +290,7 @@ impl ConnCfg { "connected to compute node at {host} ({socket_addr}) sslmode={:?}, latency={}, query_id={}", self.0.get_ssl_mode(), ctx.get_proxy_latency(), - ctx.get_testodrome_id(), + ctx.get_testodrome_id().unwrap_or_default(), ); // NB: CancelToken is supposed to hold socket_addr, but we use connect_raw. diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index 4f72a86f30..7c1a6206c1 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -272,6 +272,13 @@ impl RequestContext { .set_user_agent(user_agent); } + pub(crate) fn set_testodrome_id(&self, query_id: String) { + self.0 + .try_lock() + .expect("should not deadlock") + .set_testodrome_id(query_id); + } + pub(crate) fn set_auth_method(&self, auth_method: AuthMethod) { let mut this = self.0.try_lock().expect("should not deadlock"); this.auth_method = Some(auth_method); @@ -371,13 +378,12 @@ impl RequestContext { .accumulated() } - pub(crate) fn get_testodrome_id(&self) -> String { + pub(crate) fn get_testodrome_id(&self) -> Option { self.0 .try_lock() .expect("should not deadlock") .testodrome_query_id .clone() - .unwrap_or_default() } pub(crate) fn success(&self) { diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index b55661cec8..e40aa024a8 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -571,6 +571,11 @@ impl ConnectMechanism for TokioMechanism { "compute_id", tracing::field::display(&node_info.aux.compute_id), ); + + if let Some(query_id) = ctx.get_testodrome_id() { + info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id); + } + Ok(poll_client( self.pool.clone(), ctx, @@ -628,6 +633,10 @@ impl ConnectMechanism for HyperMechanism { tracing::field::display(&node_info.aux.compute_id), ); + if let Some(query_id) = ctx.get_testodrome_id() { + info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id); + } + Ok(poll_http2_client( self.pool.clone(), ctx, diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index acd6a05718..a7f46cbe58 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -446,6 +446,15 @@ async fn request_handler( .map(Into::into), ); + let testodrome_id = request + .headers() + .get("X-Neon-Query-ID") + .map(|value| value.to_str().unwrap_or_default().to_string()); + + if let Some(query_id) = testodrome_id { + ctx.set_testodrome_id(query_id); + } + let span = ctx.span(); info!(parent: &span, "performing websocket upgrade"); From 8983677f291bd082b146f4bd649c847a65061fb6 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Tue, 11 Mar 2025 23:09:32 +0400 Subject: [PATCH 6/9] Ignore cargo deny advisory RUSTSEC-2025-0014 for humantime (#11180) ## Problem `humantime` is not maintained and `cargo deny check` fails - Will be addressed in https://github.com/neondatabase/neon/issues/11179 ## Summary of changes Ignore RUSTSEC-2025-0014 advisory for now --- deny.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/deny.toml b/deny.toml index ed7aa9ef9f..1023b1833a 100644 --- a/deny.toml +++ b/deny.toml @@ -31,6 +31,10 @@ reason = "the marvin attack only affects private key decryption, not public key id = "RUSTSEC-2024-0436" reason = "The paste crate is a build-only dependency with no runtime components. It is unlikely to have any security impact." +[[advisories.ignore]] +id = "RUSTSEC-2025-0014" +reason = "The humantime is widely used and is not easy to replace right now. It is unmaintained, but it has no known vulnerabilities to care about. #11179" + # This section is considered when running `cargo deny check licenses` # More documentation for the licenses section can be found here: # https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html From 7d221214bbb9943710ce4f9b5c86d7bc941cf978 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 11 Mar 2025 15:13:52 -0400 Subject: [PATCH 7/9] feat(pageserver): support no-yield for gc-compaction (#11184) ## Problem This should also resolve the test flakiness of `test_gc_feedback`. close https://github.com/neondatabase/neon/issues/11144 ## Summary of changes If `NoYield` is set, do not yield in gc-compaction. --------- Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline/compaction.rs | 55 +++++++++++++------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 123079804b..e6f2104e90 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -393,6 +393,9 @@ impl GcCompactionQueue { if job.dry_run { flags |= CompactFlags::DryRun; } + if options.flags.contains(CompactFlags::NoYield) { + flags |= CompactFlags::NoYield; + } let options = CompactOptions { flags, sub_compaction: false, @@ -2617,6 +2620,7 @@ impl Timeline { ) -> Result { let sub_compaction = options.sub_compaction; let job = GcCompactJob::from_compact_options(options.clone()); + let no_yield = options.flags.contains(CompactFlags::NoYield); if sub_compaction { info!( "running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs" @@ -2631,14 +2635,15 @@ impl Timeline { idx + 1, jobs_len ); - self.compact_with_gc_inner(cancel, job, ctx).await?; + self.compact_with_gc_inner(cancel, job, ctx, no_yield) + .await?; } if jobs_len == 0 { info!("no jobs to run, skipping gc bottom-most compaction"); } return Ok(CompactionOutcome::Done); } - self.compact_with_gc_inner(cancel, job, ctx).await + self.compact_with_gc_inner(cancel, job, ctx, no_yield).await } async fn compact_with_gc_inner( @@ -2646,6 +2651,7 @@ impl Timeline { cancel: &CancellationToken, job: GcCompactJob, ctx: &RequestContext, + no_yield: bool, ) -> Result { // Block other compaction/GC tasks from running for now. GC-compaction could run along // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc. @@ -2915,14 +2921,18 @@ impl Timeline { if cancel.is_cancelled() { return Err(CompactionError::ShuttingDown); } - let should_yield = self - .l0_compaction_trigger - .notified() - .now_or_never() - .is_some(); - if should_yield { - tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers"); - return Ok(CompactionOutcome::YieldForL0); + if !no_yield { + let should_yield = self + .l0_compaction_trigger + .notified() + .now_or_never() + .is_some(); + if should_yield { + tracing::info!( + "preempt gc-compaction when downloading layers: too many L0 layers" + ); + return Ok(CompactionOutcome::YieldForL0); + } } let resident_layer = layer .download_and_keep_resident(ctx) @@ -3055,16 +3065,21 @@ impl Timeline { if cancel.is_cancelled() { return Err(CompactionError::ShuttingDown); } - keys_processed += 1; - if keys_processed % 1000 == 0 { - let should_yield = self - .l0_compaction_trigger - .notified() - .now_or_never() - .is_some(); - if should_yield { - tracing::info!("preempt gc-compaction in the main loop: too many L0 layers"); - return Ok(CompactionOutcome::YieldForL0); + + if !no_yield { + keys_processed += 1; + if keys_processed % 1000 == 0 { + let should_yield = self + .l0_compaction_trigger + .notified() + .now_or_never() + .is_some(); + if should_yield { + tracing::info!( + "preempt gc-compaction in the main loop: too many L0 layers" + ); + return Ok(CompactionOutcome::YieldForL0); + } } } if self.shard_identity.is_key_disposable(&key) { From 083a30b1e25b28a61009658d4bcdc2d6cb7e2db4 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 11 Mar 2025 20:45:06 +0100 Subject: [PATCH 8/9] storage broker: disable deploy by default (#11172) context - https://github.com/neondatabase/cloud/issues/23486#issuecomment-2711587222 - companion infra.git PR: https://github.com/neondatabase/infra/pull/3249 --- .github/workflows/build_and_test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 197b83fac4..1c0971a49d 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -1175,7 +1175,7 @@ jobs: -f deployPgSniRouter=false \ -f deployProxy=false \ -f deployStorage=true \ - -f deployStorageBroker=true \ + -f deployStorageBroker=false \ -f deployStorageController=true \ -f branch=main \ -f dockerTag=${{needs.meta.outputs.build-tag}} \ @@ -1183,7 +1183,7 @@ jobs: gh workflow --repo neondatabase/infra run deploy-prod.yml --ref main \ -f deployStorage=true \ - -f deployStorageBroker=true \ + -f deployStorageBroker=false \ -f deployStorageController=true \ -f branch=main \ -f dockerTag=${{needs.meta.outputs.build-tag}} From 158db414bf881fb358494e3215d192c8fa420a53 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 11 Mar 2025 21:40:23 +0100 Subject: [PATCH 9/9] buffered writer: handle write errors by retrying all write IO errors indefinitely (#10993) # Problem If the Pageserver ingest path (InMemoryLayer=>EphemeralFile=>BufferedWriter) encounters ENOSPC or any other write IO error when flushing the mutable buffer of the BufferedWriter, the buffered writer is left in a state where subsequent _reads_ from the InMemoryLayer it will cause a `must not use after we returned an error` panic. The reason is that 1. the flush background task bails on flush failure, 2. causing the `FlushHandle::flush` function to fail at channel.recv() and 3. causing the `FlushHandle::flush` function to bail with the flush error, 4. leaving its caller `BufferedWriter::flush` with `BufferedWriter::mutable = None`, 5. once the InMemoryLayer's RwLock::write guard is dropped, subsequent reads can enter, 6. those reads find `mutable = None` and cause the panic. # Context It has always been the contract that writes against the BufferedWriter API must not be retried because the writer/stream-style/append-only interface makes no atomicity guarantees ("On error, did nothing or a piece of the buffer get appended?"). The idea was that the error would bubble up to upper layers that can throw away the buffered writer and create a new one. (See our [internal error handling policy document on how to handle e.g. `ENOSPC`](https://github.com/neondatabase/docs/blob/c870a50bc099d82444947a353fb302c761949c94/src/storage/handling_io_and_logical_errors.md#L36-L43)). That _might_ be true for delta/image layer writers, I haven't checked. But it's certainly not true for the ingest path: there are no provisions to throw away an InMemoryLayer that encountered a write error an reingest the WAL already written to it. Adding such higher-level retries would involve either resetting last_record_lsn to a lower value and restarting walreceiver. The code isn't flexible enough to do that, and such complexity likely isn't worth it given that write errors are rare. # Solution The solution in this PR is to retry _any_ failing write operation _indefinitely_ inside the buffered writer flush task, except of course those that are fatal as per `maybe_fatal_err`. Retrying indefinitely ensures that `BufferedWriter::mutable` is never left `None` in the case of IO errors, thereby solving the problem described above. It's a clear improvement over the status quo. However, while we're retrying, we build up backpressure because the `flush` is only double-buffered, not infinitely buffered. Backpressure here is generally good to avoid resource exhaustion, **but blocks reads** and hence stalls GetPage requests because InMemoryLayer reads and writes are mutually exclusive. That's orthogonal to the problem that is solved here, though. ## Caveats Note that there are some remaining conditions in the flush background task where it can bail with an error. I have annotated one of them with a TODO comment. Hence the `FlushHandle::flush` is still fallible and hence the overall scenario of leaving `mutable = None` on the bail path is still possible. We can clean that up in a later commit. Note also that retrying indefinitely is great for temporary errors like ENOSPC but likely undesirable in case the `std::io::Error` we get is really due to higher-level logic bugs. For example, we could fail to flush because the timeline or tenant directory got deleted and VirtualFile's reopen fails with ENOENT. Note finally that cancellation is not respected while we're retrying. This means we will block timeline/tenant/pageserver shutdown. The reason is that the existing cancellation story for the buffered writer background task was to recv from flush op channel until the sending side (FlushHandle) is explicitly shut down or dropped. Failing to handle cancellation carries the operational risk that even if a single timeline gets stuck because of a logic bug such as the one laid out above, we must still restart the whole pageserver process. # Alternatives Considered As pointed out in the `Context` section, throwing away a InMemoryLayer that encountered an error and reingesting the WAL is a lot of complexity that IMO isn't justified for such an edge case. Also, it's wasteful. I think it's a local optimum. A more general and simpler solution for ENOSPC is to `abort()` the process and run eviction on startup before bringing up the rest of pageserver. I argued for it in the past, the pro arguments are still valid and complete: https://neondb.slack.com/archives/C033RQ5SPDH/p1716896265296329 The trouble at the time was implementing eviction on startup. However, maybe things are simpler now that we are fully storcon-managed and all tenants have secondaries. For example, if pageserver `abort()`s on ENOSPC and then simply don't respond to storcon heartbeats while we're running eviction on startup, storcon will fail tenants over to the secondary anyway, giving us all the time we need to clean up. The downside is that if there's a systemic space management bug, above proposal will just propagate the problem to other nodes. But I imagine that because of the delays involved with filling up disks, the system might reach a half-stable state, providing operators more time to react. # Demo Intermediary commit `a03f335121480afc0171b0f34606bdf929e962c5` is demoed in this (internal) screen recording: https://drive.google.com/file/d/1nBC6lFV2himQ8vRXDXrY30yfWmI2JL5J/view?usp=drive_link # Perf Testing Ran `bench_ingest` on tmpfs, no measurable difference. Spans are uniquely owned by the flush task, and the span stack isn't too deep, so, enter and exit should be cheap. Plus, each flush takes ~150us with direct IO enabled, so, not _that_ high frequency event anyways. # Refs - fixes https://github.com/neondatabase/neon/issues/10856 --- pageserver/src/tenant/ephemeral_file.rs | 3 +- .../tenant/remote_timeline_client/download.rs | 3 +- pageserver/src/virtual_file.rs | 5 +- .../virtual_file/owned_buffers_io/write.rs | 9 ++- .../owned_buffers_io/write/flush.rs | 65 ++++++++++++++++--- 5 files changed, 68 insertions(+), 17 deletions(-) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index f048a355a8..396d930f77 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -9,7 +9,7 @@ use camino::Utf8PathBuf; use num_traits::Num; use pageserver_api::shard::TenantShardId; use tokio_epoll_uring::{BoundedBuf, Slice}; -use tracing::error; +use tracing::{error, info_span}; use utils::id::TimelineId; use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}; @@ -76,6 +76,7 @@ impl EphemeralFile { || IoBufferMut::with_capacity(TAIL_SZ), gate.enter()?, ctx, + info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename), ), _gate_guard: gate.enter()?, }) diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 92be2145ce..954ff0c1d6 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -18,7 +18,7 @@ use tokio::fs::{self, File, OpenOptions}; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio_util::io::StreamReader; use tokio_util::sync::CancellationToken; -use tracing::warn; +use tracing::{info_span, warn}; use utils::crashsafe::path_with_suffix_extension; use utils::id::{TenantId, TimelineId}; use utils::{backoff, pausable_failpoint}; @@ -229,6 +229,7 @@ async fn download_object( || IoBufferMut::with_capacity(super::BUFFER_SIZE), gate.enter().map_err(|_| DownloadError::Cancelled)?, ctx, + info_span!(parent: None, "download_object_buffered_writer", %dst_path), ); // TODO: use vectored write (writev) once supported by tokio-epoll-uring. diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 1da3130df0..cd3d897423 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -1299,9 +1299,8 @@ impl OwnedAsyncWriter for VirtualFile { buf: FullSlice, offset: u64, ctx: &RequestContext, - ) -> std::io::Result> { - let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await; - res.map(|_| buf) + ) -> (FullSlice, std::io::Result<()>) { + VirtualFile::write_all_at(self, buf, offset, ctx).await } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 861ca3aa2a..a7e06c0a14 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -31,7 +31,7 @@ pub trait OwnedAsyncWriter { buf: FullSlice, offset: u64, ctx: &RequestContext, - ) -> impl std::future::Future>> + Send; + ) -> impl std::future::Future, std::io::Result<()>)> + Send; } /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch @@ -66,6 +66,7 @@ where buf_new: impl Fn() -> B, gate_guard: utils::sync::gate::GateGuard, ctx: &RequestContext, + flush_task_span: tracing::Span, ) -> Self { Self { writer: writer.clone(), @@ -75,6 +76,7 @@ where buf_new(), gate_guard, ctx.attached_child(), + flush_task_span, ), bytes_submitted: 0, } @@ -269,12 +271,12 @@ mod tests { buf: FullSlice, offset: u64, _: &RequestContext, - ) -> std::io::Result> { + ) -> (FullSlice, std::io::Result<()>) { self.writes .lock() .unwrap() .push((Vec::from(&buf[..]), offset)); - Ok(buf) + (buf, Ok(())) } } @@ -293,6 +295,7 @@ mod tests { || IoBufferMut::with_capacity(2), gate.enter()?, ctx, + tracing::Span::none(), ); writer.write_buffered_borrowed(b"abc", ctx).await?; diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index 46309d4011..e3cf9be438 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -1,9 +1,14 @@ +use std::ops::ControlFlow; use std::sync::Arc; +use once_cell::sync::Lazy; +use tokio_util::sync::CancellationToken; +use tracing::{Instrument, info, info_span, warn}; use utils::sync::duplex; use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter}; use crate::context::RequestContext; +use crate::virtual_file::MaybeFatalIo; use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned; use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; @@ -118,6 +123,7 @@ where buf: B, gate_guard: utils::sync::gate::GateGuard, ctx: RequestContext, + span: tracing::Span, ) -> Self where B: Buffer + Send + 'static, @@ -125,11 +131,14 @@ where // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time. let (front, back) = duplex::mpsc::channel(1); - let join_handle = tokio::spawn(async move { - FlushBackgroundTask::new(back, file, gate_guard, ctx) - .run(buf.flush()) - .await - }); + let join_handle = tokio::spawn( + async move { + FlushBackgroundTask::new(back, file, gate_guard, ctx) + .run(buf.flush()) + .await + } + .instrument(span), + ); FlushHandle { inner: Some(FlushHandleInner { @@ -236,6 +245,7 @@ where /// The passed in slice is immediately sent back to the flush handle through the duplex channel. async fn run(mut self, slice: FullSlice) -> std::io::Result> { // Sends the extra buffer back to the handle. + // TODO: can this ever await and or fail? I think not. self.channel.send(slice).await.map_err(|_| { std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early") })?; @@ -251,10 +261,47 @@ where } // Write slice to disk at `offset`. - let slice = self - .writer - .write_all_at(request.slice, request.offset, &self.ctx) - .await?; + // + // Error handling happens according to the current policy of crashing + // on fatal IO errors and retrying in place otherwise (deeming all other errors retryable). + // (The upper layers of the Pageserver write path are not equipped to retry write errors + // becasuse they often deallocate the buffers that were already written). + // + // TODO: cancellation sensitiity. + // Without it, if we hit a bug where retrying is never successful, + // then we can't shut down the timeline/tenant/pageserver cleanly because + // layers of the Pageserver write path are holding the gate open for EphemeralFile. + // + // TODO: use utils::backoff::retry once async closures are actually usable + // + let mut slice_storage = Some(request.slice); + for attempt in 1.. { + let result = async { + if attempt > 1 { + info!("retrying flush"); + } + let slice = slice_storage.take().expect( + "likely previous invocation of this future didn't get polled to completion", + ); + let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await; + slice_storage = Some(slice); + let res = res.maybe_fatal_err("owned_buffers_io flush"); + let Err(err) = res else { + return ControlFlow::Break(()); + }; + warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff"); + static NO_CANCELLATION: Lazy = Lazy::new(CancellationToken::new); + utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await; + ControlFlow::Continue(()) + } + .instrument(info_span!("flush_attempt", %attempt)) + .await; + match result { + ControlFlow::Break(()) => break, + ControlFlow::Continue(()) => continue, + } + } + let slice = slice_storage.expect("loop must have run at least once"); #[cfg(test)] {