From 66f80e77ba418c6ecab1016b6b8f13beff9f3ce3 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 9 Apr 2025 17:32:19 +0100 Subject: [PATCH 01/10] tests/performance: reconcile until idle before benchmark (#11435) We'd like to run benchmarks starting from a steady state. To this end, do a reconciliation round before proceeding with the benchmark. This is useful for benchmarks that use tenant dir snapshots since a non-standard tenant configuration is used to generate the snapshot. The storage controller is not aware of the non default tenant configuration and will reconcile while the bench is running. --- test_runner/performance/pageserver/util.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test_runner/performance/pageserver/util.py b/test_runner/performance/pageserver/util.py index 7a6d88f79c..b50659defc 100644 --- a/test_runner/performance/pageserver/util.py +++ b/test_runner/performance/pageserver/util.py @@ -40,6 +40,8 @@ def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int): for layer in info.historic_layers: assert not layer.remote + env.storage_controller.reconcile_until_idle(timeout_secs=60) + log.info("ready") From afd34291ca5152c151e067c12af50b98a65d6832 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Wed, 9 Apr 2025 11:41:29 -0500 Subject: [PATCH 02/10] Make neon_local token generation generic over claims (#11507) Instead of encoding a certain structure for claims, let's allow the caller to specify what claims be encoded. Signed-off-by: Tristan Partin --- control_plane/src/local_env.rs | 4 ++-- libs/utils/src/auth.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 2616afbb16..8e2a110366 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -15,7 +15,7 @@ use clap::ValueEnum; use postgres_backend::AuthType; use reqwest::Url; use serde::{Deserialize, Serialize}; -use utils::auth::{Claims, encode_from_key_file}; +use utils::auth::encode_from_key_file; use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage}; @@ -757,7 +757,7 @@ impl LocalEnv { } // this function is used only for testing purposes in CLI e g generate tokens during init - pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result { + pub fn generate_auth_token(&self, claims: &S) -> anyhow::Result { let private_key_path = self.get_private_key_path(); let key_data = fs::read(private_key_path)?; encode_from_key_file(claims, &key_data) diff --git a/libs/utils/src/auth.rs b/libs/utils/src/auth.rs index cc5b0b1d13..db4fc5685c 100644 --- a/libs/utils/src/auth.rs +++ b/libs/utils/src/auth.rs @@ -173,7 +173,7 @@ impl std::fmt::Debug for JwtAuth { } // this function is used only for testing purposes in CLI e g generate tokens during init -pub fn encode_from_key_file(claims: &Claims, key_data: &[u8]) -> Result { +pub fn encode_from_key_file(claims: &S, key_data: &[u8]) -> Result { let key = EncodingKey::from_ed_pem(key_data)?; Ok(encode(&Header::new(STORAGE_TOKEN_ALGORITHM), claims, &key)?) } From 1c237d0c6d5eca08134f18282aa352e4d67bc4c0 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Wed, 9 Apr 2025 11:58:44 -0500 Subject: [PATCH 03/10] Move compute_ctl claims struct into public API (#11505) This is preparatory work for teaching neon_local to pass the Authorization header to compute_ctl. Signed-off-by: Tristan Partin --- compute_tools/src/http/middleware/authorize.rs | 15 +++++++-------- libs/compute_api/src/requests.rs | 8 ++++++++ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/compute_tools/src/http/middleware/authorize.rs b/compute_tools/src/http/middleware/authorize.rs index 89d55e1af3..ee3a5cb953 100644 --- a/compute_tools/src/http/middleware/authorize.rs +++ b/compute_tools/src/http/middleware/authorize.rs @@ -6,20 +6,15 @@ use axum_extra::{ TypedHeader, headers::{Authorization, authorization::Bearer}, }; +use compute_api::requests::ComputeClaims; use futures::future::BoxFuture; use http::{Request, Response, StatusCode}; use jsonwebtoken::{Algorithm, DecodingKey, TokenData, Validation, jwk::JwkSet}; -use serde::Deserialize; use tower_http::auth::AsyncAuthorizeRequest; use tracing::warn; use crate::http::{JsonResponse, extract::RequestId}; -#[derive(Clone, Debug, Deserialize)] -pub(in crate::http) struct Claims { - compute_id: String, -} - #[derive(Clone, Debug)] pub(in crate::http) struct Authorize { compute_id: String, @@ -112,7 +107,11 @@ impl AsyncAuthorizeRequest for Authorize { impl Authorize { /// Verify the token using the JSON Web Key set and return the token data. - fn verify(jwks: &JwkSet, token: &str, validation: &Validation) -> Result> { + fn verify( + jwks: &JwkSet, + token: &str, + validation: &Validation, + ) -> Result> { for jwk in jwks.keys.iter() { let decoding_key = match DecodingKey::from_jwk(jwk) { Ok(key) => key, @@ -127,7 +126,7 @@ impl Authorize { } }; - match jsonwebtoken::decode::(token, &decoding_key, validation) { + match jsonwebtoken::decode::(token, &decoding_key, validation) { Ok(data) => return Ok(data), Err(e) => { warn!( diff --git a/libs/compute_api/src/requests.rs b/libs/compute_api/src/requests.rs index 3fbdfcf83f..98f2fc297c 100644 --- a/libs/compute_api/src/requests.rs +++ b/libs/compute_api/src/requests.rs @@ -5,6 +5,14 @@ use crate::privilege::Privilege; use crate::responses::ComputeCtlConfig; use crate::spec::{ComputeSpec, ExtVersion, PgIdent}; +/// When making requests to the `compute_ctl` external HTTP server, the client +/// must specify a set of claims in `Authorization` header JWTs such that +/// `compute_ctl` can authorize the request. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ComputeClaims { + pub compute_id: String, +} + /// Request of the /configure API /// /// We now pass only `spec` in the configuration request, but later we can From af12647b9dc29811bbd31716d6ccd8b52809a20a Mon Sep 17 00:00:00 2001 From: Peter Bendel Date: Wed, 9 Apr 2025 19:11:00 +0200 Subject: [PATCH 04/10] large tenant oltp benchmark: reindex with downtime (remove concurrently) (#11498) ## Problem our large oltp benchmark runs very long - we want to remove the duration of the reindex step. we don't run concurrent workload anyhow but added "concurrently" only to have a "prod-like" approach. But if it just doubles the time we report because it requires two instead of one full table scan we can remove it ## Summary of changes remove keyword concurrently from the reindex step --- .../performance/test_perf_oltp_large_tenant.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/test_runner/performance/test_perf_oltp_large_tenant.py b/test_runner/performance/test_perf_oltp_large_tenant.py index 957a4ec796..b45394d627 100644 --- a/test_runner/performance/test_perf_oltp_large_tenant.py +++ b/test_runner/performance/test_perf_oltp_large_tenant.py @@ -145,11 +145,14 @@ def run_database_maintenance(env: PgCompare): END $$; """ ) - - log.info("start REINDEX TABLE CONCURRENTLY transaction.transaction") - with env.zenbenchmark.record_duration("reindex concurrently"): - cur.execute("REINDEX TABLE CONCURRENTLY transaction.transaction;") - log.info("finished REINDEX TABLE CONCURRENTLY transaction.transaction") + # in production a customer would likely use reindex concurrently + # but for our test we don't care about the downtime + # and it would just about double the time we report in the test + # because we need one more table scan for each index + log.info("start REINDEX TABLE transaction.transaction") + with env.zenbenchmark.record_duration("reindex"): + cur.execute("REINDEX TABLE transaction.transaction;") + log.info("finished REINDEX TABLE transaction.transaction") @pytest.mark.parametrize("custom_scripts", get_custom_scripts()) From ec66b788e2c66b883e9e4450a2571e25902014f2 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Wed, 9 Apr 2025 14:01:31 -0400 Subject: [PATCH 05/10] fix(pageserver): use different walredo retry setting for gc-compaction (#11497) ## Problem Not a complete fix for https://github.com/neondatabase/neon/issues/11492 but should work for a short term. Our current retry strategy for walredo is to retry every request exactly once. This retry doesn't make sense because it retries all requests exactly once and each error is expected to cause process restart and cause future requests to fail. I'll explain it with a scenario of two threads requesting redos: one with an invalid history (that will cause walredo to panic) and another that has a correct redo sequence. First let's look at how we handle retries right now in do_with_walredo_process. At the beginning of the function it will spawn a new process if there's no existing one. Then it will continue to redo. If the process fails, the first process that encounters the error will remove the walredo process object from the OnceCell, so that the next time it gets accessed, a new process will be spawned; if it is the last one that uses the old walredo process, it will kill and wait the process in `drop(proc)`. I'm skeptical whether this works under races but I think this is not the root cause of the problem. In this retry handler, if there are N requests attached to a walredo process and the i-th request fails (panics the walredo), all other N-i requests will fail and they need to retry so that they can access a new walredo process. ``` time ----> proc A None B request 1 ^-----------------^ fail uses A for redo replace with None request 2 ^-------------------- fail uses A for redo request 3 ^----------------^ fail uses A for redo last ref, wait for A to be killed request 4 ^--------------- None, spawn new process B ``` The problem is with our retry strategy. Normally, for a system that we want to retry on, the probability of errors for each of the requests are uncorrelated. However, in walredo, a prior request that panics the walredo process will cause all future walredo on that process to fail (that's correlated). So, back to the situation where we have 2 requests where one will definitely fail and the other will succeed and we get the following sequence, where retry attempts = 1, * new walredo process A starts. * request 1 (invalid) being processed on A and panics A, waiting for retry, remove process A from the process object. * request 2 (valid) being processed on A and receives pipe broken / poisoned process error, waiting for retry, wait for A to be killed -- this very likely takes a while and cannot finish before request 1 gets processed again * new walredo process B starts. * request 1 (invalid) being processed again on B and panics B, the whole request fail. * request 2 (valid) being processed again on B, and get a poisoned error again. ``` time ----> proc A None B None request 1 ^-----------------^--------------^--------------------^ spawn A for redo fail spawn B for redo fail request 2 ^--------------------^-------------------------^------------^ use A for redo fail, wait to kill A B for redo fail again ``` In such cases, no matter how we set n_attempts, as long as the retry count applies to all requests, this sequence is bound to fail both requests because of how they get sequenced; while we could potentially make request 2 successful. There are many solutions to this -- like having a separate walredo manager for compactions, or define which errors are retryable (i.e., broken pipe can be retried, while real walredo error won't be retried), or having a exclusive big lock over the whole redo process (the current one is very fine-grained). In this patch, we go with a simple approach: use different retry attempts for different types of requests. For gc-compaction, the attempt count is set to 0, so that it never retries and consequently stops the compaction process -- no more redo will be issued from gc-compaction. Once the walredo process gets restarted, the normal read requests will proceed normally. ## Summary of changes Add redo_attempt for each reconstruct value request to set different retry policies. --------- Signed-off-by: Alex Chi Z Co-authored-by: Erik Grinaker --- pageserver/benches/bench_walredo.rs | 11 ++++- pageserver/src/tenant.rs | 8 ++-- pageserver/src/tenant/timeline.rs | 50 ++++++++++---------- pageserver/src/tenant/timeline/compaction.rs | 6 +-- pageserver/src/walredo.rs | 27 ++++++++++- 5 files changed, 66 insertions(+), 36 deletions(-) diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 77b3f90b3e..215682d90c 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -65,7 +65,7 @@ use bytes::{Buf, Bytes}; use criterion::{BenchmarkId, Criterion}; use once_cell::sync::Lazy; use pageserver::config::PageServerConf; -use pageserver::walredo::PostgresRedoManager; +use pageserver::walredo::{PostgresRedoManager, RedoAttemptType}; use pageserver_api::key::Key; use pageserver_api::record::NeonWalRecord; use pageserver_api::shard::TenantShardId; @@ -223,7 +223,14 @@ impl Request { // TODO: avoid these clones manager - .request_redo(*key, *lsn, base_img.clone(), records.clone(), *pg_version) + .request_redo( + *key, + *lsn, + base_img.clone(), + records.clone(), + *pg_version, + RedoAttemptType::ReadPage, + ) .await .context("request_redo") } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2ac2fd0b81..d3623fc3b9 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -100,7 +100,7 @@ use crate::tenant::timeline::delete::DeleteTimelineFlow; use crate::tenant::timeline::uninit::cleanup_timeline_directory; use crate::virtual_file::VirtualFile; use crate::walingest::WalLagCooldown; -use crate::walredo::PostgresRedoManager; +use crate::walredo::{PostgresRedoManager, RedoAttemptType}; use crate::{InitializationOrder, TEMP_FILE_SUFFIX, import_datadir, span, task_mgr, walredo}; static INIT_DB_SEMAPHORE: Lazy = Lazy::new(|| Semaphore::new(8)); @@ -473,15 +473,16 @@ impl WalRedoManager { base_img: Option<(Lsn, bytes::Bytes)>, records: Vec<(Lsn, pageserver_api::record::NeonWalRecord)>, pg_version: u32, + redo_attempt_type: RedoAttemptType, ) -> Result { match self { Self::Prod(_, mgr) => { - mgr.request_redo(key, lsn, base_img, records, pg_version) + mgr.request_redo(key, lsn, base_img, records, pg_version, redo_attempt_type) .await } #[cfg(test)] Self::Test(mgr) => { - mgr.request_redo(key, lsn, base_img, records, pg_version) + mgr.request_redo(key, lsn, base_img, records, pg_version, redo_attempt_type) .await } } @@ -5879,6 +5880,7 @@ pub(crate) mod harness { base_img: Option<(Lsn, Bytes)>, records: Vec<(Lsn, NeonWalRecord)>, _pg_version: u32, + _redo_attempt_type: RedoAttemptType, ) -> Result { let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1)); if records_neon { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b10409689c..8a4a6f4b40 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -24,6 +24,7 @@ use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; use crate::PERF_TRACE_TARGET; +use crate::walredo::RedoAttemptType; use anyhow::{Context, Result, anyhow, bail, ensure}; use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; @@ -1293,6 +1294,12 @@ impl Timeline { }; reconstruct_state.read_path = read_path; + let redo_attempt_type = if ctx.task_kind() == TaskKind::Compaction { + RedoAttemptType::LegacyCompaction + } else { + RedoAttemptType::ReadPage + }; + let traversal_res: Result<(), _> = { let ctx = RequestContextBuilder::from(ctx) .perf_span(|crnt_perf_span| { @@ -1380,7 +1387,7 @@ impl Timeline { let walredo_deltas = converted.num_deltas(); let walredo_res = walredo_self - .reconstruct_value(key, lsn, converted) + .reconstruct_value(key, lsn, converted, redo_attempt_type) .maybe_perf_instrument(&ctx, |crnt_perf_span| { info_span!( target: PERF_TRACE_TARGET, @@ -6343,37 +6350,21 @@ impl Timeline { /// Reconstruct a value, using the given base image and WAL records in 'data'. async fn reconstruct_value( - &self, - key: Key, - request_lsn: Lsn, - data: ValueReconstructState, - ) -> Result { - self.reconstruct_value_inner(key, request_lsn, data, false) - .await - } - - /// Reconstruct a value, using the given base image and WAL records in 'data'. It does not fire critical errors because - /// sometimes it is expected to fail due to unreplayable history described in . - async fn reconstruct_value_wo_critical_error( - &self, - key: Key, - request_lsn: Lsn, - data: ValueReconstructState, - ) -> Result { - self.reconstruct_value_inner(key, request_lsn, data, true) - .await - } - - async fn reconstruct_value_inner( &self, key: Key, request_lsn: Lsn, mut data: ValueReconstructState, - no_critical_error: bool, + redo_attempt_type: RedoAttemptType, ) -> Result { // Perform WAL redo if needed data.records.reverse(); + let fire_critical_error = match redo_attempt_type { + RedoAttemptType::ReadPage => true, + RedoAttemptType::LegacyCompaction => true, + RedoAttemptType::GcCompaction => false, + }; + // If we have a page image, and no WAL, we're all set if data.records.is_empty() { if let Some((img_lsn, img)) = &data.img { @@ -6420,13 +6411,20 @@ impl Timeline { .as_ref() .context("timeline has no walredo manager") .map_err(PageReconstructError::WalRedo)? - .request_redo(key, request_lsn, data.img, data.records, self.pg_version) + .request_redo( + key, + request_lsn, + data.img, + data.records, + self.pg_version, + redo_attempt_type, + ) .await; let img = match res { Ok(img) => img, Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled), Err(walredo::Error::Other(err)) => { - if !no_critical_error { + if fire_critical_error { critical!("walredo failure during page reconstruction: {err:?}"); } return Err(PageReconstructError::WalRedo( diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 8403c0a7d9..5f969a4e77 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -16,6 +16,8 @@ use super::{ Timeline, }; +use crate::tenant::timeline::DeltaEntry; +use crate::walredo::RedoAttemptType; use anyhow::{Context, anyhow}; use bytes::Bytes; use enumset::EnumSet; @@ -2411,7 +2413,7 @@ impl Timeline { lsn_split_points[i] }; let img = self - .reconstruct_value_wo_critical_error(key, request_lsn, state) + .reconstruct_value(key, request_lsn, state, RedoAttemptType::GcCompaction) .await?; Some((request_lsn, img)) } else { @@ -3909,8 +3911,6 @@ impl CompactionLayer for OwnArc { } } -use crate::tenant::timeline::DeltaEntry; - impl CompactionLayer for ResidentDeltaLayer { fn key_range(&self) -> &Range { &self.0.layer_desc().key_range diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 22d8d83811..ed8a954369 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -136,6 +136,16 @@ macro_rules! bail { } } +#[derive(Debug, Clone, Copy)] +pub enum RedoAttemptType { + /// Used for the read path. Will fire critical errors and retry twice if failure. + ReadPage, + // Used for legacy compaction (only used in image compaction). Will fire critical errors and retry once if failure. + LegacyCompaction, + // Used for gc compaction. Will not fire critical errors and not retry. + GcCompaction, +} + /// /// Public interface of WAL redo manager /// @@ -156,11 +166,18 @@ impl PostgresRedoManager { base_img: Option<(Lsn, Bytes)>, records: Vec<(Lsn, NeonWalRecord)>, pg_version: u32, + redo_attempt_type: RedoAttemptType, ) -> Result { if records.is_empty() { bail!("invalid WAL redo request with no records"); } + let max_retry_attempts = match redo_attempt_type { + RedoAttemptType::ReadPage => 2, + RedoAttemptType::LegacyCompaction => 1, + RedoAttemptType::GcCompaction => 0, + }; + let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID); let mut img = base_img.map(|p| p.1); let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1); @@ -180,6 +197,7 @@ impl PostgresRedoManager { &records[batch_start..i], self.conf.wal_redo_timeout, pg_version, + max_retry_attempts, ) .await }; @@ -201,6 +219,7 @@ impl PostgresRedoManager { &records[batch_start..], self.conf.wal_redo_timeout, pg_version, + max_retry_attempts, ) .await } @@ -424,11 +443,11 @@ impl PostgresRedoManager { records: &[(Lsn, NeonWalRecord)], wal_redo_timeout: Duration, pg_version: u32, + max_retry_attempts: u32, ) -> Result { *(self.last_redo_at.lock().unwrap()) = Some(Instant::now()); let (rel, blknum) = key.to_rel_block().context("invalid record")?; - const MAX_RETRY_ATTEMPTS: u32 = 1; let mut n_attempts = 0u32; loop { let base_img = &base_img; @@ -486,7 +505,7 @@ impl PostgresRedoManager { info!(n_attempts, "retried walredo succeeded"); } n_attempts += 1; - if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() { + if n_attempts > max_retry_attempts || result.is_ok() { return result; } } @@ -560,6 +579,7 @@ mod tests { use super::PostgresRedoManager; use crate::config::PageServerConf; + use crate::walredo::RedoAttemptType; #[tokio::test] async fn test_ping() { @@ -593,6 +613,7 @@ mod tests { None, short_records(), 14, + RedoAttemptType::ReadPage, ) .instrument(h.span()) .await @@ -621,6 +642,7 @@ mod tests { None, short_records(), 14, + RedoAttemptType::ReadPage, ) .instrument(h.span()) .await @@ -642,6 +664,7 @@ mod tests { None, short_records(), 16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */ + RedoAttemptType::ReadPage, ) .instrument(h.span()) .await From 2c21a65b0b3776b2ed938fb2404f99ea312df148 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Wed, 9 Apr 2025 14:07:58 -0400 Subject: [PATCH 06/10] feat(pageserver): add gc-compaction time-to-first-item stats (#11475) ## Problem In some cases gc-compaction doesn't respond to the L0 compaction yield notifier. I suspect it's stuck on getting the first item, and if so, we probably need to let L0 yield notifier preempt `next_with_trace`. ## Summary of changes - Add `time_to_first_kv_pair` to gc-compaction statistics. - Inverse the ratio so that smaller ratio -> better compaction ratio. --------- Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline/compaction.rs | 30 +++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 5f969a4e77..8e3be8e7f4 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -7,7 +7,7 @@ use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::ops::{Deref, Range}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use super::layer_manager::LayerManager; use super::{ @@ -821,15 +821,16 @@ pub struct CompactionStatistics { time_acquire_lock_secs: f64, time_analyze_secs: f64, time_download_layer_secs: f64, + time_to_first_kv_pair_secs: f64, time_main_loop_secs: f64, time_final_phase_secs: f64, time_total_secs: f64, // Summary - /// Ratio of the key-value size before/after gc-compaction. - uncompressed_size_ratio: f64, - /// Ratio of the physical size before/after gc-compaction. - physical_size_ratio: f64, + /// Ratio of the key-value size after/before gc-compaction. + uncompressed_retention_ratio: f64, + /// Ratio of the physical size after/before gc-compaction. + compressed_retention_ratio: f64, } impl CompactionStatistics { @@ -898,15 +899,15 @@ impl CompactionStatistics { fn finalize(&mut self) { let original_key_value_size = self.image_keys_visited.size + self.wal_keys_visited.size; let produced_key_value_size = self.image_produced.size + self.wal_produced.size; - self.uncompressed_size_ratio = - original_key_value_size as f64 / (produced_key_value_size as f64 + 1.0); // avoid div by 0 + self.uncompressed_retention_ratio = + produced_key_value_size as f64 / (original_key_value_size as f64 + 1.0); // avoid div by 0 let original_physical_size = self.image_layer_visited.size + self.delta_layer_visited.size; let produced_physical_size = self.image_layer_produced.size + self.delta_layer_produced.size + self.image_layer_discarded.size + self.delta_layer_discarded.size; // Also include the discarded layers to make the ratio accurate - self.physical_size_ratio = - original_physical_size as f64 / (produced_physical_size as f64 + 1.0); // avoid div by 0 + self.compressed_retention_ratio = + produced_physical_size as f64 / (original_physical_size as f64 + 1.0); // avoid div by 0 } } @@ -3034,7 +3035,7 @@ impl Timeline { .map_err(CompactionError::Other)?; let time_download_layer = timer.elapsed(); - let timer = Instant::now(); + let mut timer = Instant::now(); // Step 2: Produce images+deltas. let mut accumulated_values = Vec::new(); @@ -3109,6 +3110,7 @@ impl Timeline { // Actually, we can decide not to write to the image layer at all at this point because // the key and LSN range are determined. However, to keep things simple here, we still // create this writer, and discard the writer in the end. + let mut time_to_first_kv_pair = None; while let Some(((key, lsn, val), desc)) = merge_iter .next_with_trace() @@ -3116,6 +3118,11 @@ impl Timeline { .context("failed to get next key-value pair") .map_err(CompactionError::Other)? { + if time_to_first_kv_pair.is_none() { + time_to_first_kv_pair = Some(timer.elapsed()); + timer = Instant::now(); + } + if cancel.is_cancelled() { return Err(CompactionError::ShuttingDown); } @@ -3451,6 +3458,9 @@ impl Timeline { let time_final_phase = timer.elapsed(); stat.time_final_phase_secs = time_final_phase.as_secs_f64(); + stat.time_to_first_kv_pair_secs = time_to_first_kv_pair + .unwrap_or(Duration::ZERO) + .as_secs_f64(); stat.time_main_loop_secs = time_main_loop.as_secs_f64(); stat.time_acquire_lock_secs = time_acquire_lock.as_secs_f64(); stat.time_download_layer_secs = time_download_layer.as_secs_f64(); From 63ee8e218195e42daa305085ad847a38ceda93cb Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 9 Apr 2025 21:03:49 +0200 Subject: [PATCH 07/10] test_runner: ignore `.___temp` files in `evict_random_layers` (#11509) ## Problem `test_location_conf_churn` often fails with `neither image nor delta layer`, but doesn't say what the file actually is. However, past local failures have indicated that it might be `.___temp` files. Touches https://github.com/neondatabase/neon/issues/11348. ## Summary of changes Ignore `.___temp` files when evicting local layers, and include the file name in the error message. --- test_runner/fixtures/pageserver/common_types.py | 2 +- test_runner/regress/test_pageserver_secondary.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test_runner/fixtures/pageserver/common_types.py b/test_runner/fixtures/pageserver/common_types.py index 0e068db593..0a92883e96 100644 --- a/test_runner/fixtures/pageserver/common_types.py +++ b/test_runner/fixtures/pageserver/common_types.py @@ -105,7 +105,7 @@ def parse_layer_file_name(file_name: str) -> LayerName: except InvalidFileName: pass - raise InvalidFileName("neither image nor delta layer") + raise InvalidFileName(f"neither image nor delta layer: {file_name}") def is_future_layer(layer_file_name: LayerName, disk_consistent_lsn: Lsn): diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index c73a592d98..d03d05d33d 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -61,7 +61,7 @@ def evict_random_layers( ) client = pageserver.http_client() for layer in initial_local_layers: - if "ephemeral" in layer.name or "temp_download" in layer.name: + if "ephemeral" in layer.name or "temp_download" in layer.name or ".___temp" in layer.name: continue layer_name = parse_layer_file_name(layer.name) From 405a17bf0b3bf2aa646bf5be561112408733e36c Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Wed, 9 Apr 2025 16:57:50 -0400 Subject: [PATCH 08/10] fix(pageserver): ensure gc-compaction gets preempted by L0 (#11512) ## Problem Part of #9114 ## Summary of changes Gc-compaction flag was not correctly set, causing it not getting preempted by L0. Signed-off-by: Alex Chi Z --- pageserver/src/http/routes.rs | 1 + pageserver/src/tenant/timeline/compaction.rs | 3 + test_runner/regress/test_compaction.py | 59 +++++++++++++++++++- 3 files changed, 62 insertions(+), 1 deletion(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 200b91fc82..9bb761dc48 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2274,6 +2274,7 @@ async fn timeline_compact_handler( if Some(true) == parse_query_param::<_, bool>(&request, "dry_run")? { flags |= CompactFlags::DryRun; } + // Manual compaction does not yield for L0. let wait_until_uploaded = parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false); diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 8e3be8e7f4..7b1969f209 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -317,6 +317,9 @@ impl GcCompactionQueue { flags: { let mut flags = EnumSet::new(); flags |= CompactFlags::EnhancedGcBottomMostCompaction; + if timeline.get_compaction_l0_first() { + flags |= CompactFlags::YieldForL0; + } flags }, sub_compaction: true, diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 6789939e0c..087fafb327 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -38,12 +38,34 @@ PREEMPT_COMPACTION_TENANT_CONF = { "compaction_target_size": 1024**2, "image_creation_threshold": 1, "image_creation_preempt_threshold": 1, - # compact more frequently + # Compact more frequently "compaction_threshold": 3, "compaction_upper_limit": 6, "lsn_lease_length": "0s", } +PREEMPT_GC_COMPACTION_TENANT_CONF = { + "gc_period": "5s", + "compaction_period": "5s", + # Small checkpoint distance to create many layers + "checkpoint_distance": 1024**2, + # Compact small layers + "compaction_target_size": 1024**2, + "image_creation_threshold": 10000, # Do not create image layers at all + "image_creation_preempt_threshold": 10000, + # Compact more frequently + "compaction_threshold": 3, + "compaction_upper_limit": 6, + "lsn_lease_length": "0s", + # Enable gc-compaction + "gc_compaction_enabled": "true", + "gc_compaction_initial_threshold_kb": 1024, # At a small threshold + "gc_compaction_ratio_percent": 1, + # No PiTR interval and small GC horizon + "pitr_interval": "0s", + "gc_horizon": f"{1024**2}", +} + @skip_in_debug_build("only run with release build") @pytest.mark.parametrize( @@ -165,6 +187,41 @@ def test_pageserver_compaction_preempt( env.pageserver.assert_log_contains("resuming image layer creation") +@skip_in_debug_build("only run with release build") +def test_pageserver_gc_compaction_preempt( + neon_env_builder: NeonEnvBuilder, +): + # Ideally we should be able to do unit tests for this, but we need real Postgres + # WALs in order to do unit testing... + + conf = PREEMPT_GC_COMPACTION_TENANT_CONF.copy() + env = neon_env_builder.init_start(initial_tenant_conf=conf) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + row_count = 200000 + churn_rounds = 10 + + ps_http = env.pageserver.http_client() + + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageserver.id) + + log.info("Writing initial data ...") + workload.write_rows(row_count, env.pageserver.id) + + for i in range(1, churn_rounds + 1): + log.info(f"Running churn round {i}/{churn_rounds} ...") + workload.churn_rows(row_count, env.pageserver.id, upload=False) + workload.validate(env.pageserver.id) + ps_http.timeline_compact(tenant_id, timeline_id, wait_until_uploaded=True) + log.info("Validating at workload end ...") + workload.validate(env.pageserver.id) + # ensure gc_compaction gets preempted and then resumed + env.pageserver.assert_log_contains("preempt gc-compaction") + + @skip_in_debug_build("only run with release build") @pytest.mark.timeout(900) # This test is slow with sanitizers enabled, especially on ARM @pytest.mark.parametrize( From af0be11503dc94e78b9f664d1d1096004eab9a4a Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Wed, 9 Apr 2025 17:41:11 -0400 Subject: [PATCH 09/10] fix(pageserver): ensure gc-compaction gets preempted by L0 (#11512) ## Problem Part of #9114 ## Summary of changes Gc-compaction flag was not correctly set, causing it not getting preempted by L0. Signed-off-by: Alex Chi Z From a04e33ceb638a3ee5fef8d642b57ffc3a4543c98 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Wed, 9 Apr 2025 17:39:54 -0500 Subject: [PATCH 10/10] Remove --spec-json argument from compute_ctl (#11510) It isn't used by the production control plane or neon_local. The removal simplifies compute spec logic just a little bit more since we can remove any notion of whether we should allow live reconfigurations. Signed-off-by: Tristan Partin --- compute_tools/src/bin/compute_ctl.rs | 28 +++++++--------------- compute_tools/src/compute.rs | 14 ----------- compute_tools/src/http/routes/configure.rs | 7 ------ 3 files changed, 9 insertions(+), 40 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index da11ac2860..4796a07d92 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -118,16 +118,18 @@ struct Cli { #[arg(long)] pub set_disk_quota_for_fs: Option, - #[arg(short = 's', long = "spec", group = "spec")] - pub spec_json: Option, - #[arg(short = 'S', long, group = "spec-path")] pub spec_path: Option, #[arg(short = 'i', long, group = "compute-id")] pub compute_id: String, - #[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")] + #[arg( + short = 'p', + long, + conflicts_with = "spec-path", + value_name = "CONTROL_PLANE_API_BASE_URL" + )] pub control_plane_uri: Option, } @@ -172,7 +174,6 @@ fn main() -> Result<()> { cgroup: cli.cgroup, #[cfg(target_os = "linux")] vm_monitor_addr: cli.vm_monitor_addr, - live_config_allowed: cli_spec.live_config_allowed, }, cli_spec.spec, cli_spec.compute_ctl_config, @@ -201,23 +202,12 @@ async fn init() -> Result<()> { } fn try_spec_from_cli(cli: &Cli) -> Result { - // First, try to get cluster spec from the cli argument - if let Some(ref spec_json) = cli.spec_json { - info!("got spec from cli argument {}", spec_json); - return Ok(CliSpecParams { - spec: Some(serde_json::from_str(spec_json)?), - compute_ctl_config: ComputeCtlConfig::default(), - live_config_allowed: false, - }); - } - - // Second, try to read it from the file if path is provided + // First, read spec from the path if provided if let Some(ref spec_path) = cli.spec_path { let file = File::open(Path::new(spec_path))?; return Ok(CliSpecParams { spec: Some(serde_json::from_reader(file)?), compute_ctl_config: ComputeCtlConfig::default(), - live_config_allowed: true, }); } @@ -225,11 +215,12 @@ fn try_spec_from_cli(cli: &Cli) -> Result { panic!("must specify --control-plane-uri"); }; + // If the spec wasn't provided in the CLI arguments, then retrieve it from + // the control plane match get_spec_from_control_plane(cli.control_plane_uri.as_ref().unwrap(), &cli.compute_id) { Ok(resp) => Ok(CliSpecParams { spec: resp.0, compute_ctl_config: resp.1, - live_config_allowed: true, }), Err(e) => { error!( @@ -247,7 +238,6 @@ struct CliSpecParams { spec: Option, #[allow(dead_code)] compute_ctl_config: ComputeCtlConfig, - live_config_allowed: bool, } fn deinit_and_exit(exit_code: Option) -> ! { diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 9dfcde1dbc..ad8925e7ab 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -93,20 +93,6 @@ pub struct ComputeNodeParams { /// the address of extension storage proxy gateway pub ext_remote_storage: Option, - - /// We should only allow live re- / configuration of the compute node if - /// it uses 'pull model', i.e. it can go to control-plane and fetch - /// the latest configuration. Otherwise, there could be a case: - /// - we start compute with some spec provided as argument - /// - we push new spec and it does reconfiguration - /// - but then something happens and compute pod / VM is destroyed, - /// so k8s controller starts it again with the **old** spec - /// - /// and the same for empty computes: - /// - we started compute without any spec - /// - we push spec and it does configuration - /// - but then it is restarted without any spec again - pub live_config_allowed: bool, } /// Compute node info shared across several `compute_ctl` threads. diff --git a/compute_tools/src/http/routes/configure.rs b/compute_tools/src/http/routes/configure.rs index 3c5a6a6d41..f7a19da611 100644 --- a/compute_tools/src/http/routes/configure.rs +++ b/compute_tools/src/http/routes/configure.rs @@ -22,13 +22,6 @@ pub(in crate::http) async fn configure( State(compute): State>, request: Json, ) -> Response { - if !compute.params.live_config_allowed { - return JsonResponse::error( - StatusCode::PRECONDITION_FAILED, - "live configuration is not allowed for this compute node".to_string(), - ); - } - let pspec = match ParsedSpec::try_from(request.spec.clone()) { Ok(p) => p, Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),