From 2f71eda00fbfea3ee3382fef3024b6376676726e Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 9 Jul 2025 18:12:59 +0200 Subject: [PATCH 01/15] pageserver/client_grpc: add separate pools for bulk requests (#12475) ## Problem GetPage bulk requests such as prefetches and vacuum can head-of-line block foreground requests, causing increased latency. Touches #11735. Requires #12469. ## Summary of changes * Use dedicated channel/client/stream pools for bulk GetPage requests. * Use lower concurrency but higher queue depth for bulk pools. * Make pool limits configurable. * Require unbounded client pool for stream pool, to avoid accidental starvation. --- pageserver/client_grpc/src/client.rs | 112 +++++++++++++++----- pageserver/client_grpc/src/pool.rs | 148 +++++++++++++++------------ pageserver/page_api/src/model.rs | 15 ++- 3 files changed, 180 insertions(+), 95 deletions(-) diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index c21ce2e47d..63852868c3 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::num::NonZero; use std::sync::Arc; use anyhow::anyhow; @@ -15,6 +16,32 @@ use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::{ShardCount, ShardIndex, ShardNumber}; +/// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up +/// when full. +/// +/// TODO: tune all of these constants, and consider making them configurable. +/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels +/// with only streams. +const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); + +/// Max number of concurrent unary request clients per shard. +const MAX_UNARY_CLIENTS: NonZero = NonZero::new(64).unwrap(); + +/// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage +/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`. +const MAX_STREAMS: NonZero = NonZero::new(64).unwrap(); + +/// Max number of pipelined requests per stream. +const MAX_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(2).unwrap(); + +/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these +/// are more throughput-oriented, we have a smaller limit but higher queue depth. +const MAX_BULK_STREAMS: NonZero = NonZero::new(16).unwrap(); + +/// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus +/// get a larger queue depth. +const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); + /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the /// basic `page_api::Client` gRPC client, and supports: /// @@ -87,6 +114,7 @@ impl PageserverClient { /// errors. All responses will have `GetPageStatusCode::Ok`. #[instrument(skip_all, fields( req_id = %req.request_id, + class = %req.request_class, rel = %req.rel, blkno = %req.block_numbers[0], blks = %req.block_numbers.len(), @@ -141,7 +169,11 @@ impl PageserverClient { let resp = self .retry .with(async || { - let stream = self.shards.get(shard_id)?.stream().await; + let stream = self + .shards + .get(shard_id)? + .stream(req.request_class.is_bulk()) + .await; let resp = stream.send(req.clone()).await?; // Convert per-request errors into a tonic::Status. @@ -270,17 +302,22 @@ impl Shards { } } -/// A single shard. +/// A single shard. Uses dedicated resource pools with the following structure: /// -/// TODO: consider separate pools for normal and bulk traffic, with different settings. +/// * Channel pool: unbounded. +/// * Unary client pool: MAX_UNARY_CLIENTS. +/// * Stream client pool: unbounded. +/// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH. +/// * Bulk channel pool: unbounded. +/// * Bulk client pool: unbounded. +/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH. struct Shard { - /// Dedicated channel pool for this shard. Shared by all clients/streams in this shard. - _channel_pool: Arc, - /// Unary gRPC client pool for this shard. Uses the shared channel pool. + /// Unary gRPC client pool. client_pool: Arc, - /// GetPage stream pool for this shard. Uses a dedicated client pool, but shares the channel - /// pool with unary clients. + /// GetPage stream pool. stream_pool: Arc, + /// GetPage stream pool for bulk requests, e.g. prefetches. + bulk_stream_pool: Arc, } impl Shard { @@ -297,34 +334,53 @@ impl Shard { return Err(anyhow!("invalid shard URL {url}: must use gRPC")); } - // Use a common channel pool for all clients, to multiplex unary and stream requests across - // the same TCP connections. The channel pool is unbounded (but client pools are bounded). - let channel_pool = ChannelPool::new(url)?; + // Common channel pool for unary and stream requests. Bounded by client/stream pools. + let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?; - // Dedicated client pool for unary requests. + // Client pool for unary requests. let client_pool = ClientPool::new( channel_pool.clone(), tenant_id, timeline_id, shard_id, auth_token.clone(), + Some(MAX_UNARY_CLIENTS), ); - // Stream pool with dedicated client pool. If this shared a client pool with unary requests, - // long-lived streams could fill up the client pool and starve out unary requests. It shares - // the same underlying channel pool with unary clients though, which is unbounded. - let stream_pool = StreamPool::new(ClientPool::new( - channel_pool.clone(), - tenant_id, - timeline_id, - shard_id, - auth_token, - )); + // GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients, + // but shares a channel pool with it (as it's unbounded). + let stream_pool = StreamPool::new( + ClientPool::new( + channel_pool.clone(), + tenant_id, + timeline_id, + shard_id, + auth_token.clone(), + None, // unbounded, limited by stream pool + ), + Some(MAX_STREAMS), + MAX_STREAM_QUEUE_DEPTH, + ); + + // Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools + // to avoid head-of-line blocking of latency-sensitive requests. + let bulk_stream_pool = StreamPool::new( + ClientPool::new( + ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?, + tenant_id, + timeline_id, + shard_id, + auth_token, + None, // unbounded, limited by stream pool + ), + Some(MAX_BULK_STREAMS), + MAX_BULK_STREAM_QUEUE_DEPTH, + ); Ok(Self { - _channel_pool: channel_pool, client_pool, stream_pool, + bulk_stream_pool, }) } @@ -336,8 +392,12 @@ impl Shard { .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) } - /// Returns a pooled stream for this shard. - async fn stream(&self) -> StreamGuard { - self.stream_pool.get().await + /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream + /// pool (e.g. for prefetches). + async fn stream(&self, bulk: bool) -> StreamGuard { + match bulk { + false => self.stream_pool.get().await, + true => self.bulk_stream_pool.get().await, + } } } diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 518e4e5b84..5a50004fd1 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -30,6 +30,7 @@ //! TODO: observability. use std::collections::{BTreeMap, HashMap}; +use std::num::NonZero; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; @@ -44,22 +45,10 @@ use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::ShardIndex; -/// Max number of concurrent clients per channel. -/// -/// TODO: tune these constants, and make them configurable. -/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels -/// with only streams. -const CLIENTS_PER_CHANNEL: usize = 16; - -/// Maximum number of concurrent clients per `ClientPool`. -const CLIENT_LIMIT: usize = 64; - -/// Max number of pipelined requests per gRPC GetPage stream. -const STREAM_QUEUE_DEPTH: usize = 2; - /// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2 -/// stream multiplexing), up to `CLIENTS_PER_CHANNEL`. The pool does not limit the number of -/// channels, and instead relies on `ClientPool` to limit the number of concurrent clients. +/// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this. +/// The pool does not limit the number of channels, and instead relies on `ClientPool` or +/// `StreamPool` to limit the number of concurrent clients. /// /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads. /// @@ -69,6 +58,8 @@ const STREAM_QUEUE_DEPTH: usize = 2; pub struct ChannelPool { /// Pageserver endpoint to connect to. endpoint: Endpoint, + /// Max number of clients per channel. Beyond this, a new channel will be created. + max_clients_per_channel: NonZero, /// Open channels. channels: Mutex>, /// Channel ID generator. @@ -86,13 +77,14 @@ struct ChannelEntry { impl ChannelPool { /// Creates a new channel pool for the given Pageserver endpoint. - pub fn new(endpoint: E) -> anyhow::Result> + pub fn new(endpoint: E, max_clients_per_channel: NonZero) -> anyhow::Result> where E: TryInto + Send + Sync + 'static, >::Error: std::error::Error + Send + Sync, { Ok(Arc::new(Self { endpoint: endpoint.try_into()?, + max_clients_per_channel, channels: Mutex::default(), next_channel_id: AtomicUsize::default(), })) @@ -120,8 +112,11 @@ impl ChannelPool { // with lower-ordered channel IDs first. This will cluster clients in lower-ordered // channels, and free up higher-ordered channels such that they can be reaped. for (&id, entry) in channels.iter_mut() { - assert!(entry.clients <= CLIENTS_PER_CHANNEL, "channel overflow"); - if entry.clients < CLIENTS_PER_CHANNEL { + assert!( + entry.clients <= self.max_clients_per_channel.get(), + "channel overflow" + ); + if entry.clients < self.max_clients_per_channel.get() { entry.clients += 1; return ChannelGuard { pool: Arc::downgrade(self), @@ -181,7 +176,7 @@ impl Drop for ChannelGuard { /// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner /// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total -/// number of concurrent clients to `CLIENT_LIMIT` via semaphore. +/// number of concurrent clients to `max_clients` via semaphore. /// /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads. /// @@ -197,8 +192,8 @@ pub struct ClientPool { auth_token: Option, /// Channel pool to acquire channels from. channel_pool: Arc, - /// Limits the max number of concurrent clients for this pool. - limiter: Arc, + /// Limits the max number of concurrent clients for this pool. None if the pool is unbounded. + limiter: Option>, /// Idle pooled clients. Acquired clients are removed from here and returned on drop. /// /// The first client in the map will be acquired next. The map is sorted by client ID, which in @@ -221,13 +216,15 @@ struct ClientEntry { impl ClientPool { /// Creates a new client pool for the given tenant shard. Channels are acquired from the given - /// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. + /// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. Allows up to + /// `max_clients` concurrent clients, or unbounded if None. pub fn new( channel_pool: Arc, tenant_id: TenantId, timeline_id: TimelineId, shard_id: ShardIndex, auth_token: Option, + max_clients: Option>, ) -> Arc { Arc::new(Self { tenant_id, @@ -236,25 +233,24 @@ impl ClientPool { auth_token, channel_pool, idle: Mutex::default(), - limiter: Arc::new(Semaphore::new(CLIENT_LIMIT)), + limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))), next_client_id: AtomicUsize::default(), }) } /// Gets a client from the pool, or creates a new one if necessary. Connections are established - /// lazily and do not block, but this call can block if the pool is at `CLIENT_LIMIT`. The - /// client is returned to the pool when the guard is dropped. + /// lazily and do not block, but this call can block if the pool is at `max_clients`. The client + /// is returned to the pool when the guard is dropped. /// /// This is moderately performance-sensitive. It is called for every unary request, but these /// establish a new gRPC stream per request so they're already expensive. GetPage requests use /// the `StreamPool` instead. pub async fn get(self: &Arc) -> anyhow::Result { - let permit = self - .limiter - .clone() - .acquire_owned() - .await - .expect("never closed"); + // Acquire a permit if the pool is bounded. + let mut permit = None; + if let Some(limiter) = self.limiter.clone() { + permit = Some(limiter.acquire_owned().await.expect("never closed")); + } // Fast path: acquire an idle client from the pool. if let Some((id, entry)) = self.idle.lock().unwrap().pop_first() { @@ -296,9 +292,9 @@ impl ClientPool { pub struct ClientGuard { pool: Weak, id: ClientID, - client: Option, // Some until dropped - channel_guard: Option, // Some until dropped - permit: OwnedSemaphorePermit, + client: Option, // Some until dropped + channel_guard: Option, // Some until dropped + permit: Option, // None if pool is unbounded } impl Deref for ClientGuard { @@ -341,16 +337,21 @@ impl Drop for ClientGuard { /// TODO: reap idle streams. /// TODO: consider making this generic over request and response types; not currently needed. pub struct StreamPool { - /// The client pool to acquire clients from. + /// The client pool to acquire clients from. Must be unbounded. client_pool: Arc, /// All pooled streams. /// /// Incoming requests will be sent over an existing stream with available capacity. If all - /// streams are full, a new one is spun up and added to the pool (up to the `ClientPool` limit). - /// Each stream has an associated Tokio task that processes requests and responses. + /// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each + /// stream has an associated Tokio task that processes requests and responses. streams: Arc>>, - /// Limits the max number of concurrent requests (not streams). - limiter: Arc, + /// The max number of concurrent streams, or None if unbounded. + max_streams: Option>, + /// The max number of concurrent requests per stream. + max_queue_depth: NonZero, + /// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`. + /// None if the pool is unbounded. + limiter: Option>, /// Stream ID generator. next_stream_id: AtomicUsize, } @@ -369,16 +370,27 @@ struct StreamEntry { } impl StreamPool { - /// Creates a new stream pool, using the given client pool. + /// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth` + /// concurrent requests on each stream, and use up to `max_streams` concurrent streams. /// - /// NB: the stream pool should use a dedicated client pool. Otherwise, long-lived streams may - /// fill up the client pool and starve out unary requests. Client pools can share the same - /// `ChannelPool` though, since the channel pool is unbounded. - pub fn new(client_pool: Arc) -> Arc { + /// The client pool must be unbounded. The stream pool will enforce its own limits, and because + /// streams are long-lived they can cause persistent starvation if they exhaust the client pool. + /// The stream pool should generally have its own dedicated client pool (but it can share a + /// channel pool with others since these are always unbounded). + pub fn new( + client_pool: Arc, + max_streams: Option>, + max_queue_depth: NonZero, + ) -> Arc { + assert!(client_pool.limiter.is_none(), "bounded client pool"); Arc::new(Self { client_pool, streams: Arc::default(), - limiter: Arc::new(Semaphore::new(CLIENT_LIMIT * STREAM_QUEUE_DEPTH)), + limiter: max_streams.map(|max_streams| { + Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get())) + }), + max_streams, + max_queue_depth, next_stream_id: AtomicUsize::default(), }) } @@ -402,18 +414,17 @@ impl StreamPool { /// /// For now, we just do something simple and functional, but very inefficient (linear scan). pub async fn get(&self) -> StreamGuard { - let permit = self - .limiter - .clone() - .acquire_owned() - .await - .expect("never closed"); + // Acquire a permit if the pool is bounded. + let mut permit = None; + if let Some(limiter) = self.limiter.clone() { + permit = Some(limiter.acquire_owned().await.expect("never closed")); + } let mut streams = self.streams.lock().unwrap(); // Look for a pooled stream with available capacity. for entry in streams.values() { assert!( - entry.queue_depth.load(Ordering::Relaxed) <= STREAM_QUEUE_DEPTH, + entry.queue_depth.load(Ordering::Relaxed) <= self.max_queue_depth.get(), "stream queue overflow" ); if entry @@ -421,7 +432,7 @@ impl StreamPool { .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |queue_depth| { // Increment the queue depth via compare-and-swap. // TODO: review ordering. - (queue_depth < STREAM_QUEUE_DEPTH).then_some(queue_depth + 1) + (queue_depth < self.max_queue_depth.get()).then_some(queue_depth + 1) }) .is_ok() { @@ -438,18 +449,16 @@ impl StreamPool { // join onto this stream and also create additional streams concurrently if this fills up. let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed); let queue_depth = Arc::new(AtomicUsize::new(1)); // reserve quota for this caller - let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH); + let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get()); let entry = StreamEntry { sender: req_tx.clone(), queue_depth: queue_depth.clone(), }; streams.insert(id, entry); - // NB: make sure we don't overshoot the client limit. The semaphore limit is CLIENT_LIMIT * - // STREAM_QUEUE_DEPTH, but if we were to misaccount queue depth we'd try to spin up more - // streams than CLIENT_LIMIT and block on the client pool ~forever. This should not happen - // because we only acquire queue depth under lock and after acquiring a semaphore permit. - assert!(streams.len() <= CLIENT_LIMIT, "stream overflow"); + if let Some(max_streams) = self.max_streams { + assert!(streams.len() <= max_streams.get(), "stream overflow"); + }; let client_pool = self.client_pool.clone(); let streams = self.streams.clone(); @@ -484,19 +493,22 @@ impl StreamPool { // Acquire a client from the pool and create a stream. let mut client = client_pool.get().await?; - let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH); - let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx); + // NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could + // theoretically deadlock if both the client and server block on sends (since we're not + // reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and + // low queue depths, but it was seen to happen with the libpq protocol so better safe than + // sorry. It should never buffer more than the queue depth anyway, but using an unbounded + // channel guarantees that it will never block. + let (req_tx, req_rx) = mpsc::unbounded_channel(); + let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx); let mut resp_stream = client.get_pages(req_stream).await?; // Track caller response channels by request ID. If the task returns early, these response // channels will be dropped and the waiting callers will receive an error. - let mut callers = HashMap::with_capacity(STREAM_QUEUE_DEPTH); + let mut callers = HashMap::new(); // Process requests and responses. loop { - // NB: this can trip if the server doesn't respond to a request, so only debug_assert. - debug_assert!(callers.len() <= STREAM_QUEUE_DEPTH, "stream queue overflow"); - tokio::select! { // Receive requests from callers and send them to the stream. req = caller_rx.recv() => { @@ -515,8 +527,8 @@ impl StreamPool { } callers.insert(req.request_id, resp_tx); - // Send the request on the stream. Bail out if the send fails. - req_tx.send(req).await.map_err(|_| { + // Send the request on the stream. Bail out if the stream is closed. + req_tx.send(req).map_err(|_| { tonic::Status::unavailable("stream closed") })?; } @@ -545,7 +557,7 @@ impl StreamPool { pub struct StreamGuard { sender: RequestSender, queue_depth: Arc, - permit: OwnedSemaphorePermit, + permit: Option, // None if pool is unbounded } impl StreamGuard { diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index c5b6f06879..d0d3517d41 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -384,7 +384,7 @@ impl From for proto::GetPageRequest { pub type RequestID = u64; /// A GetPage request class. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, strum_macros::Display)] pub enum GetPageClass { /// Unknown class. For backwards compatibility: used when an older client version sends a class /// that a newer server version has removed. @@ -397,6 +397,19 @@ pub enum GetPageClass { Background, } +impl GetPageClass { + /// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than + /// latency-sensitive). + pub fn is_bulk(&self) -> bool { + match self { + Self::Unknown => false, + Self::Normal => false, + Self::Prefetch => true, + Self::Background => true, + } + } +} + impl From for GetPageClass { fn from(pb: proto::GetPageClass) -> Self { match pb { From 12c26243fc5e8a4522a8f848923f6e3eb7e0ad7c Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Wed, 9 Jul 2025 11:47:21 -0500 Subject: [PATCH 02/15] Fix typo in migration testing related to pg_monitor (#12530) We should be joining on the neon_superuser roleid, not the pg_monitor roleid. Signed-off-by: Tristan Partin --- .../tests/0004-grant_pg_monitor_to_neon_superuser.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql index acb8dd417d..deb7a364af 100644 --- a/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql +++ b/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql @@ -6,7 +6,7 @@ BEGIN admin_option AS admin INTO monitor FROM pg_auth_members - WHERE roleid = 'pg_monitor'::regrole + WHERE roleid = 'neon_superuser'::regrole AND member = 'pg_monitor'::regrole; IF NOT monitor.member THEN From 4bbabc092a7b675eb5b624c3d647e216e98dbe2d Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Wed, 9 Jul 2025 21:16:06 +0400 Subject: [PATCH 03/15] tests: wait for flush lsn in test_branch_creation_before_gc (#12527) ## Problem Test `test_branch_creation_before_gc` is flaky in the internal repo. Pageserver sometimes lags behind write LSN. When we call GC it might not reach the LSN we try to create the branch at yet. ## Summary of changes - Wait till flush lsn on pageserver reaches the latest LSN before calling GC. --- test_runner/regress/test_branch_and_gc.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test_runner/regress/test_branch_and_gc.py b/test_runner/regress/test_branch_and_gc.py index 8447c9bf2d..148f469a95 100644 --- a/test_runner/regress/test_branch_and_gc.py +++ b/test_runner/regress/test_branch_and_gc.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING import pytest from fixtures.common_types import Lsn, TimelineId from fixtures.log_helper import log +from fixtures.neon_fixtures import wait_for_last_flush_lsn from fixtures.pageserver.http import TimelineCreate406 from fixtures.utils import query_scalar, skip_in_debug_build @@ -162,6 +163,9 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv): ) lsn = Lsn(res[2][0][0]) + # Wait for all WAL to reach the pageserver, so GC cutoff LSN is greater than `lsn`. + wait_for_last_flush_lsn(env, endpoint0, tenant, b0) + # Use `failpoint=sleep` and `threading` to make the GC iteration triggers *before* the # branch creation task but the individual timeline GC iteration happens *after* # the branch creation task. From fe0ddb7169514b3c755c01a519d1067b696d2925 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 9 Jul 2025 18:41:34 +0100 Subject: [PATCH 04/15] libs: make remote storage failure injection probabilistic (#12526) Change the unreliable storage wrapper to fail by probability when there are more failure attempts left. Co-authored-by: Yecheng Yang --- libs/pageserver_api/src/config.rs | 2 + libs/remote_storage/Cargo.toml | 1 + libs/remote_storage/src/lib.rs | 10 +++- libs/remote_storage/src/simulate_failures.rs | 31 ++++++++-- libs/utils/src/env.rs | 59 ++++++++++++++++++++ pageserver/src/bin/pageserver.rs | 7 ++- pageserver/src/config.rs | 6 ++ proxy/src/context/parquet.rs | 2 +- 8 files changed, 107 insertions(+), 11 deletions(-) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index dc7e9aed7f..22815955c1 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -226,6 +226,7 @@ pub struct ConfigToml { pub synthetic_size_calculation_interval: Duration, pub disk_usage_based_eviction: DiskUsageEvictionTaskConfig, pub test_remote_failures: u64, + pub test_remote_failures_probability: u64, pub ondemand_download_behavior_treat_error_as_warn: bool, #[serde(with = "humantime_serde")] pub background_task_maximum_delay: Duration, @@ -758,6 +759,7 @@ impl Default for ConfigToml { disk_usage_based_eviction: DiskUsageEvictionTaskConfig::default(), test_remote_failures: (0), + test_remote_failures_probability: (100), ondemand_download_behavior_treat_error_as_warn: (false), diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 69316fd493..0ae13552b8 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -43,6 +43,7 @@ itertools.workspace = true sync_wrapper = { workspace = true, features = ["futures"] } byteorder = "1.4" +rand = "0.8.5" [dev-dependencies] camino-tempfile.workspace = true diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index ed416b2811..5885c3e791 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -732,9 +732,15 @@ impl GenericRemoteStorage { }) } - pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self { - Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first))) + /* BEGIN_HADRON */ + pub fn unreliable_wrapper(s: Self, fail_first: u64, fail_probability: u64) -> Self { + Self::Unreliable(Arc::new(UnreliableWrapper::new( + s, + fail_first, + fail_probability, + ))) } + /* END_HADRON */ /// See [`RemoteStorage::upload`], which this method calls with `None` as metadata. pub async fn upload_storage_object( diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index f9856a5856..30d116f57c 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -1,6 +1,8 @@ //! This module provides a wrapper around a real RemoteStorage implementation that //! causes the first N attempts at each upload or download operatio to fail. For //! testing purposes. +use rand::Rng; +use std::cmp; use std::collections::HashMap; use std::collections::hash_map::Entry; use std::num::NonZeroU32; @@ -25,6 +27,12 @@ pub struct UnreliableWrapper { // Tracks how many failed attempts of each operation has been made. attempts: Mutex>, + + /* BEGIN_HADRON */ + // This the probability of failure for each operation, ranged from [0, 100]. + // The probability is default to 100, which means that all operations will fail. + attempt_failure_probability: u64, + /* END_HADRON */ } /// Used to identify retries of different unique operation. @@ -40,7 +48,11 @@ enum RemoteOp { } impl UnreliableWrapper { - pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self { + pub fn new( + inner: crate::GenericRemoteStorage, + attempts_to_fail: u64, + attempt_failure_probability: u64, + ) -> Self { assert!(attempts_to_fail > 0); let inner = match inner { GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s), @@ -51,9 +63,11 @@ impl UnreliableWrapper { panic!("Can't wrap unreliable wrapper unreliably") } }; + let actual_attempt_failure_probability = cmp::min(attempt_failure_probability, 100); UnreliableWrapper { inner, attempts_to_fail, + attempt_failure_probability: actual_attempt_failure_probability, attempts: Mutex::new(HashMap::new()), } } @@ -66,6 +80,7 @@ impl UnreliableWrapper { /// fn attempt(&self, op: RemoteOp) -> anyhow::Result { let mut attempts = self.attempts.lock().unwrap(); + let mut rng = rand::thread_rng(); match attempts.entry(op) { Entry::Occupied(mut e) => { @@ -75,15 +90,19 @@ impl UnreliableWrapper { *p }; - if attempts_before_this >= self.attempts_to_fail { - // let it succeed - e.remove(); - Ok(attempts_before_this) - } else { + /* BEGIN_HADRON */ + // If there are more attempts to fail, fail the request by probability. + if (attempts_before_this < self.attempts_to_fail) + && (rng.gen_range(0..=100) < self.attempt_failure_probability) + { let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key()); Err(error) + } else { + e.remove(); + Ok(attempts_before_this) } + /* END_HADRON */ } Entry::Vacant(e) => { let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key()); diff --git a/libs/utils/src/env.rs b/libs/utils/src/env.rs index 2a85f54a01..cc1cbf8009 100644 --- a/libs/utils/src/env.rs +++ b/libs/utils/src/env.rs @@ -44,3 +44,62 @@ where } } } + +/* BEGIN_HADRON */ +pub enum DeploymentMode { + Dev, + Staging, + Prod, +} + +pub fn get_deployment_mode() -> Option { + match std::env::var("DEPLOYMENT_MODE") { + Ok(env) => match env.as_str() { + "development" => Some(DeploymentMode::Dev), + "staging" => Some(DeploymentMode::Staging), + "production" => Some(DeploymentMode::Prod), + _ => { + tracing::error!("Unexpected DEPLOYMENT_MODE: {}", env); + None + } + }, + Err(_) => { + tracing::error!("DEPLOYMENT_MODE not set"); + None + } + } +} + +pub fn is_dev_or_staging() -> bool { + matches!( + get_deployment_mode(), + Some(DeploymentMode::Dev) | Some(DeploymentMode::Staging) + ) +} + +pub enum TestingMode { + Chaos, + Stress, +} + +pub fn get_test_mode() -> Option { + match std::env::var("HADRON_TEST_MODE") { + Ok(env) => match env.as_str() { + "chaos" => Some(TestingMode::Chaos), + "stress" => Some(TestingMode::Stress), + _ => { + tracing::error!("Unexpected HADRON_TEST_MODE: {}", env); + None + } + }, + Err(_) => { + tracing::error!("HADRON_TEST_MODE not set"); + None + } + } +} + +pub fn is_chaos_testing() -> bool { + matches!(get_test_mode(), Some(TestingMode::Chaos)) +} +/* END_HADRON */ diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 327384fd82..78aba25d2e 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -889,8 +889,11 @@ async fn create_remote_storage_client( "Simulating remote failures for first {} attempts of each op", conf.test_remote_failures ); - remote_storage = - GenericRemoteStorage::unreliable_wrapper(remote_storage, conf.test_remote_failures); + remote_storage = GenericRemoteStorage::unreliable_wrapper( + remote_storage, + conf.test_remote_failures, + conf.test_remote_failures_probability, + ); } Ok(remote_storage) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 99d7e0ca3a..15ec31b0a6 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -147,7 +147,11 @@ pub struct PageServerConf { pub disk_usage_based_eviction: DiskUsageEvictionTaskConfig, + // The number of allowed failures in remote storage operations. pub test_remote_failures: u64, + // The probability of failure in remote storage operations. Only works when test_remote_failures > 1. + // Use 100 for 100% failure, 0 for no failure. + pub test_remote_failures_probability: u64, pub ondemand_download_behavior_treat_error_as_warn: bool, @@ -392,6 +396,7 @@ impl PageServerConf { synthetic_size_calculation_interval, disk_usage_based_eviction, test_remote_failures, + test_remote_failures_probability, ondemand_download_behavior_treat_error_as_warn, background_task_maximum_delay, control_plane_api, @@ -461,6 +466,7 @@ impl PageServerConf { synthetic_size_calculation_interval, disk_usage_based_eviction, test_remote_failures, + test_remote_failures_probability, ondemand_download_behavior_treat_error_as_warn, background_task_maximum_delay, control_plane_api: control_plane_api diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index b55cc14532..4d8df19476 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -267,7 +267,7 @@ async fn worker_inner( ) -> anyhow::Result<()> { #[cfg(any(test, feature = "testing"))] let storage = if config.test_remote_failures > 0 { - GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures) + GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures, 100) } else { storage }; From 28f604d628bfefa26b3016421756f91c8b6b2817 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Wed, 9 Jul 2025 13:45:50 -0500 Subject: [PATCH 05/15] Make pg_monitor neon_superuser test more robust (#12532) Make sure to check for NULL just in case. Signed-off-by: Tristan Partin Co-authored-by: Vikas Jain --- .../0004-grant_pg_monitor_to_neon_superuser.sql | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql index deb7a364af..3464a2b1cf 100644 --- a/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql +++ b/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql @@ -6,14 +6,18 @@ BEGIN admin_option AS admin INTO monitor FROM pg_auth_members - WHERE roleid = 'neon_superuser'::regrole - AND member = 'pg_monitor'::regrole; + WHERE roleid = 'pg_monitor'::regrole + AND member = 'neon_superuser'::regrole; - IF NOT monitor.member THEN + IF monitor IS NULL THEN + RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_monitor'; + END IF; + + IF monitor.admin IS NULL OR NOT monitor.member THEN RAISE EXCEPTION 'neon_superuser is not a member of pg_monitor'; END IF; - IF NOT monitor.admin THEN + IF monitor.admin IS NULL OR NOT monitor.admin THEN RAISE EXCEPTION 'neon_superuser cannot grant pg_monitor'; END IF; END $$; From 0b639ba608d65b90df468346e72777e8953a4f42 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Wed, 9 Jul 2025 16:22:55 -0400 Subject: [PATCH 06/15] fix(storcon): correctly pass through lease error code (#12519) ## Problem close LKB-199 ## Summary of changes We always return the error as 500 to the cplane if a LSN lease request fails. This cause issues for the cplane as they don't retry on 500. This patch correctly passes through the error and assign the error code so that cplane can know if it is a retryable error. (TODO: look at the cplane code and learn the retry logic). Note that this patch does not resolve LKB-253 -- we need to handle not found error separately in the lsn lease path, like wait until the tenant gets attached, or return 503 so that cplane can retry. --------- Signed-off-by: Alex Chi Z --- storage_controller/src/service.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index ed6643d641..d2f7287be9 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -4428,7 +4428,7 @@ impl Service { .await; let mut failed = 0; - for (tid, result) in targeted_tenant_shards.iter().zip(results.into_iter()) { + for (tid, (_, result)) in targeted_tenant_shards.iter().zip(results.into_iter()) { match result { Ok(ok) => { if tid.is_shard_zero() { @@ -4795,7 +4795,7 @@ impl Service { .await; let mut valid_until = None; - for r in res { + for (node, r) in res { match r { Ok(lease) => { if let Some(ref mut valid_until) = valid_until { @@ -4805,7 +4805,7 @@ impl Service { } } Err(e) => { - return Err(ApiError::InternalServerError(anyhow::anyhow!(e))); + return Err(passthrough_api_error(&node, e)); } } } @@ -4919,7 +4919,7 @@ impl Service { max_retries: u32, timeout: Duration, cancel: &CancellationToken, - ) -> Vec> + ) -> Vec<(Node, mgmt_api::Result)> where O: Fn(TenantShardId, PageserverClient) -> F + Copy, F: std::future::Future>, @@ -4940,16 +4940,16 @@ impl Service { cancel, ) .await; - (idx, r) + (idx, node, r) }); } - while let Some((idx, r)) = futs.next().await { - results.push((idx, r.unwrap_or(Err(mgmt_api::Error::Cancelled)))); + while let Some((idx, node, r)) = futs.next().await { + results.push((idx, node, r.unwrap_or(Err(mgmt_api::Error::Cancelled)))); } - results.sort_by_key(|(idx, _)| *idx); - results.into_iter().map(|(_, r)| r).collect() + results.sort_by_key(|(idx, _, _)| *idx); + results.into_iter().map(|(_, node, r)| (node, r)).collect() } /// Helper for safely working with the shards in a tenant remotely on pageservers, for example @@ -5862,7 +5862,7 @@ impl Service { return; } - for result in self + for (_, result) in self .tenant_for_shards_api( attached, |tenant_shard_id, client| async move { @@ -5881,7 +5881,7 @@ impl Service { } } - for result in self + for (_, result) in self .tenant_for_shards_api( secondary, |tenant_shard_id, client| async move { @@ -8768,7 +8768,7 @@ impl Service { ) .await; - for ((tenant_shard_id, node, optimization), secondary_status) in + for ((tenant_shard_id, node, optimization), (_, secondary_status)) in want_secondary_status.into_iter().zip(results.into_iter()) { match secondary_status { From 2edd59aefbcd79288735cbbed335a27880597529 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 9 Jul 2025 23:15:44 +0200 Subject: [PATCH 07/15] impr(compaction): unify checking of `CompactionError` for cancellation reason (#12392) There are a couple of places that call `CompactionError::is_cancel` but don't check the `::Other` variant via downcasting for root cause being cancellation. The only place that does it is `log_compaction_error`. It's sad we have to do it, but, until we get around cleaning up all the culprits, a step forward is to unify the behavior so that all places that inspect a `CompactionError` for cancellation reason follow the same behavior. Thus, this PR ... - moves the downcasting checks against the `::Other` variant from `log_compaction_error` into `is_cancel()` and - enforces via type system that `.is_cancel()` is used to check whether a CompactionError is due to cancellation (matching on the `CompactionError::ShuttingDown` will cause a compile-time error). I don't think there's a _serious_ case right now where matching instead of using `is_cancel` causes problems. The worst case I could find is the circuit breaker and `compaction_failed`, which don't really matter if we're shutting down the timeline anyway. But it's unaesthetic and might cause log/alert noise down the line, so, this PR fixes that at least. Refs - https://databricks.atlassian.net/browse/LKB-182 - slack conversation about this PR: https://databricks.slack.com/archives/C09254R641L/p1751284317955159 --- pageserver/src/http/routes.rs | 11 +- pageserver/src/tenant.rs | 17 ++- pageserver/src/tenant/tasks.rs | 46 +------- pageserver/src/tenant/timeline.rs | 115 ++++++++++++++----- pageserver/src/tenant/timeline/compaction.rs | 26 ++--- 5 files changed, 117 insertions(+), 98 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 3612686b5d..767bba49e2 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -79,8 +79,8 @@ use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerNa use crate::tenant::timeline::layer_manager::LayerManagerLockHolder; use crate::tenant::timeline::offload::{OffloadError, offload_timeline}; use crate::tenant::timeline::{ - CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline, - WaitLsnTimeout, WaitLsnWaiter, import_pgdata, + CompactFlags, CompactOptions, CompactRequest, MarkInvisibleRequest, Timeline, WaitLsnTimeout, + WaitLsnWaiter, import_pgdata, }; use crate::tenant::{ GetTimelineError, LogicalSizeCalculationCause, OffloadedTimeline, PageReconstructError, @@ -2500,9 +2500,10 @@ async fn timeline_checkpoint_handler( .compact(&cancel, flags, &ctx) .await .map_err(|e| - match e { - CompactionError::ShuttingDown => ApiError::ShuttingDown, - CompactionError::Other(e) => ApiError::InternalServerError(e), + if e.is_cancel() { + ApiError::ShuttingDown + } else { + ApiError::InternalServerError(e.into_anyhow()) } )?; } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f576119db8..240ba36236 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3291,7 +3291,7 @@ impl TenantShard { // Ignore this, we likely raced with unarchival. OffloadError::NotArchived => Ok(()), OffloadError::AlreadyInProgress => Ok(()), - OffloadError::Cancelled => Err(CompactionError::ShuttingDown), + OffloadError::Cancelled => Err(CompactionError::new_cancelled()), // don't break the anyhow chain OffloadError::Other(err) => Err(CompactionError::Other(err)), })?; @@ -3321,16 +3321,13 @@ impl TenantShard { /// Trips the compaction circuit breaker if appropriate. pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) { - match err { - err if err.is_cancel() => {} - CompactionError::ShuttingDown => (), - CompactionError::Other(err) => { - self.compaction_circuit_breaker - .lock() - .unwrap() - .fail(&CIRCUIT_BREAKERS_BROKEN, err); - } + if err.is_cancel() { + return; } + self.compaction_circuit_breaker + .lock() + .unwrap() + .fail(&CIRCUIT_BREAKERS_BROKEN, err); } /// Cancel scheduled compaction tasks diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index bcece5589a..08fc7d61a5 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -17,17 +17,14 @@ use tracing::*; use utils::backoff::exponential_backoff_duration; use utils::completion::Barrier; use utils::pausable_failpoint; -use utils::sync::gate::GateError; use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS}; use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind}; -use crate::tenant::blob_io::WriteBlobError; use crate::tenant::throttle::Stats; use crate::tenant::timeline::CompactionError; use crate::tenant::timeline::compaction::CompactionOutcome; use crate::tenant::{TenantShard, TenantState}; -use crate::virtual_file::owned_buffers_io::write::FlushTaskError; /// Semaphore limiting concurrent background tasks (across all tenants). /// @@ -310,45 +307,12 @@ pub(crate) fn log_compaction_error( task_cancelled: bool, degrade_to_warning: bool, ) { - use CompactionError::*; + let is_cancel = err.is_cancel(); - use crate::tenant::PageReconstructError; - use crate::tenant::upload_queue::NotInitialized; - - let level = match err { - e if e.is_cancel() => return, - ShuttingDown => return, - _ if task_cancelled => Level::INFO, - Other(err) => { - let root_cause = err.root_cause(); - - let upload_queue = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_stopping()); - let timeline = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_cancel()); - let buffered_writer_flush_task_canelled = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_cancel()); - let write_blob_cancelled = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_cancel()); - let gate_closed = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_cancel()); - let is_stopping = upload_queue - || timeline - || buffered_writer_flush_task_canelled - || write_blob_cancelled - || gate_closed; - - if is_stopping { - Level::INFO - } else { - Level::ERROR - } - } + let level = if is_cancel || task_cancelled { + Level::INFO + } else { + Level::ERROR }; if let Some((error_count, sleep_duration)) = retry_info { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 6088f40669..0a026d288e 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1002,7 +1002,7 @@ impl From for tonic::Status { impl From for CompactionError { fn from(e: CreateImageLayersError) -> Self { match e { - CreateImageLayersError::Cancelled => CompactionError::ShuttingDown, + CreateImageLayersError::Cancelled => CompactionError::new_cancelled(), CreateImageLayersError::Other(e) => { CompactionError::Other(e.context("create image layers")) } @@ -2117,12 +2117,7 @@ impl Timeline { match &result { Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed), Err(e) if e.is_cancel() => {} - Err(CompactionError::ShuttingDown) => { - // Covered by the `Err(e) if e.is_cancel()` branch. - } - Err(CompactionError::Other(_)) => { - self.compaction_failed.store(true, AtomicOrdering::Relaxed) - } + Err(_) => self.compaction_failed.store(true, AtomicOrdering::Relaxed), }; result @@ -6057,26 +6052,88 @@ impl Drop for Timeline { } } -/// Top-level failure to compact. -#[derive(Debug, thiserror::Error)] -pub(crate) enum CompactionError { - #[error("The timeline or pageserver is shutting down")] - ShuttingDown, - #[error(transparent)] - Other(anyhow::Error), -} +pub(crate) use compaction_error::CompactionError; +/// In a private mod to enforce that [`CompactionError::is_cancel`] is used +/// instead of `match`ing on [`CompactionError::ShuttingDown`]. +mod compaction_error { + use utils::sync::gate::GateError; -impl CompactionError { - /// Errors that can be ignored, i.e., cancel and shutdown. - pub fn is_cancel(&self) -> bool { - matches!(self, Self::ShuttingDown) + use crate::{ + pgdatadir_mapping::CollectKeySpaceError, + tenant::{PageReconstructError, blob_io::WriteBlobError, upload_queue::NotInitialized}, + virtual_file::owned_buffers_io::write::FlushTaskError, + }; + + /// Top-level failure to compact. Use [`Self::is_cancel`]. + #[derive(Debug, thiserror::Error)] + pub(crate) enum CompactionError { + /// Use [`Self::is_cancel`] instead of checking for this variant. + #[error("The timeline or pageserver is shutting down")] + #[allow(private_interfaces)] + ShuttingDown(ForbidMatching), // private ForbidMatching enforces use of [`Self::is_cancel`]. + #[error(transparent)] + Other(anyhow::Error), } - pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self { - if err.is_cancel() { - Self::ShuttingDown - } else { - Self::Other(err.into_anyhow()) + #[derive(Debug)] + struct ForbidMatching; + + impl CompactionError { + pub fn new_cancelled() -> Self { + Self::ShuttingDown(ForbidMatching) + } + /// Errors that can be ignored, i.e., cancel and shutdown. + pub fn is_cancel(&self) -> bool { + let other = match self { + CompactionError::ShuttingDown(_) => return true, + CompactionError::Other(other) => other, + }; + + // The write path of compaction in particular often lacks differentiated + // handling errors stemming from cancellation from other errors. + // So, if requested, we also check the ::Other variant by downcasting. + // The list below has been found empirically from flaky tests and production logs. + // The process is simple: on ::Other(), compaction will print the enclosed + // anyhow::Error in debug mode, i.e., with backtrace. That backtrace contains the + // line where the write path / compaction code does undifferentiated error handling + // from a non-anyhow type to an anyhow type. Add the type to the list of downcasts + // below, following the same is_cancel() pattern. + + let root_cause = other.root_cause(); + + let upload_queue = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_stopping()); + let timeline = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_cancel()); + let buffered_writer_flush_task_canelled = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_cancel()); + let write_blob_cancelled = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_cancel()); + let gate_closed = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_cancel()); + upload_queue + || timeline + || buffered_writer_flush_task_canelled + || write_blob_cancelled + || gate_closed + } + pub fn into_anyhow(self) -> anyhow::Error { + match self { + CompactionError::ShuttingDown(ForbidMatching) => anyhow::Error::new(self), + CompactionError::Other(e) => e, + } + } + pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self { + if err.is_cancel() { + Self::new_cancelled() + } else { + Self::Other(err.into_anyhow()) + } } } } @@ -6088,7 +6145,7 @@ impl From for CompactionError { CompactionError::Other(anyhow::anyhow!(value)) } super::upload_queue::NotInitialized::ShuttingDown - | super::upload_queue::NotInitialized::Stopped => CompactionError::ShuttingDown, + | super::upload_queue::NotInitialized::Stopped => CompactionError::new_cancelled(), } } } @@ -6098,7 +6155,7 @@ impl From for CompactionError { match e { super::storage_layer::layer::DownloadError::TimelineShutdown | super::storage_layer::layer::DownloadError::DownloadCancelled => { - CompactionError::ShuttingDown + CompactionError::new_cancelled() } super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads | super::storage_layer::layer::DownloadError::DownloadRequired @@ -6117,14 +6174,14 @@ impl From for CompactionError { impl From for CompactionError { fn from(_: layer_manager::Shutdown) -> Self { - CompactionError::ShuttingDown + CompactionError::new_cancelled() } } impl From for CompactionError { fn from(e: super::storage_layer::errors::PutError) -> Self { if e.is_cancel() { - CompactionError::ShuttingDown + CompactionError::new_cancelled() } else { CompactionError::Other(e.into_anyhow()) } @@ -6223,7 +6280,7 @@ impl Timeline { let mut guard = tokio::select! { guard = self.layers.write(LayerManagerLockHolder::Compaction) => guard, _ = self.cancel.cancelled() => { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } }; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index c263df1eb2..18a0ca852d 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -572,8 +572,8 @@ impl GcCompactionQueue { } match res { Ok(res) => Ok(res), - Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown), - Err(CompactionError::Other(_)) => { + Err(e) if e.is_cancel() => Err(e), + Err(_) => { // There are some cases where traditional gc might collect some layer // files causing gc-compaction cannot read the full history of the key. // This needs to be resolved in the long-term by improving the compaction @@ -1260,7 +1260,7 @@ impl Timeline { // Is the timeline being deleted? if self.is_stopping() { trace!("Dropping out of compaction on timeline shutdown"); - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let target_file_size = self.get_checkpoint_distance(); @@ -1624,7 +1624,7 @@ impl Timeline { for (i, layer) in layers_to_rewrite.into_iter().enumerate() { if self.cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total); @@ -1722,7 +1722,7 @@ impl Timeline { Ok(()) => {}, Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)), Err(WaitCompletionError::UploadQueueShutDownOrStopped) => { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } }, // Don't wait if there's L0 compaction to do. We don't need to update the outcome @@ -1985,7 +1985,7 @@ impl Timeline { let mut all_keys = Vec::new(); for l in deltas_to_compact.iter() { if self.cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?; let keys = delta @@ -2078,7 +2078,7 @@ impl Timeline { stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now(); if self.cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now(); @@ -2186,7 +2186,7 @@ impl Timeline { // avoid hitting the cancellation token on every key. in benches, we end up // shuffling an order of million keys per layer, this means we'll check it // around tens of times per layer. - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let same_key = prev_key == Some(key); @@ -2271,7 +2271,7 @@ impl Timeline { if writer.is_none() { if self.cancel.is_cancelled() { // to be somewhat responsive to cancellation, check for each new layer - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } // Create writer if not initiaized yet writer = Some( @@ -2527,7 +2527,7 @@ impl Timeline { // Is the timeline being deleted? if self.is_stopping() { trace!("Dropping out of compaction on timeline shutdown"); - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let (dense_ks, _sparse_ks) = self @@ -3189,7 +3189,7 @@ impl Timeline { let gc_lock = async { tokio::select! { guard = self.gc_lock.lock() => Ok(guard), - _ = cancel.cancelled() => Err(CompactionError::ShuttingDown), + _ = cancel.cancelled() => Err(CompactionError::new_cancelled()), } }; @@ -3462,7 +3462,7 @@ impl Timeline { } total_layer_size += layer.layer_desc().file_size; if cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let should_yield = yield_for_l0 && self @@ -3609,7 +3609,7 @@ impl Timeline { } if cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let should_yield = yield_for_l0 From 13e38a58a14c60da94486904d60a8b9e8e391503 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Wed, 9 Jul 2025 16:35:39 -0500 Subject: [PATCH 08/15] Grant pg_signal_backend to neon_superuser (#12533) Allow neon_superuser to cancel backends from non-neon_superusers, excluding Postgres superusers. Signed-off-by: Tristan Partin Co-authored-by: Vikas Jain --- ...nt_pg_signal_backend_to_neon_superuser.sql | 1 + ...nt_pg_signal_backend_to_neon_superuser.sql | 23 +++++++++++++++++++ compute_tools/src/spec.rs | 1 + 3 files changed, 25 insertions(+) create mode 100644 compute_tools/src/migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql create mode 100644 compute_tools/src/migrations/tests/0012-grant_pg_signal_backend_to_neon_superuser.sql diff --git a/compute_tools/src/migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql b/compute_tools/src/migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql new file mode 100644 index 0000000000..36e31544be --- /dev/null +++ b/compute_tools/src/migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql @@ -0,0 +1 @@ +GRANT pg_signal_backend TO neon_superuser WITH ADMIN OPTION; diff --git a/compute_tools/src/migrations/tests/0012-grant_pg_signal_backend_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0012-grant_pg_signal_backend_to_neon_superuser.sql new file mode 100644 index 0000000000..e62b742d30 --- /dev/null +++ b/compute_tools/src/migrations/tests/0012-grant_pg_signal_backend_to_neon_superuser.sql @@ -0,0 +1,23 @@ +DO $$ +DECLARE + signal_backend record; +BEGIN + SELECT pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member, + admin_option AS admin + INTO signal_backend + FROM pg_auth_members + WHERE roleid = 'pg_signal_backend'::regrole + AND member = 'neon_superuser'::regrole; + + IF signal_backend IS NULL THEN + RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_signal_backend'; + END IF; + + IF signal_backend.member IS NULL OR NOT signal_backend.member THEN + RAISE EXCEPTION 'neon_superuser is not a member of pg_signal_backend'; + END IF; + + IF signal_backend.admin IS NULL OR NOT signal_backend.admin THEN + RAISE EXCEPTION 'neon_superuser cannot grant pg_signal_backend'; + END IF; +END $$; diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 43cfbb48f7..b6382b2f56 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -197,6 +197,7 @@ pub async fn handle_migrations(client: &mut Client) -> Result<()> { include_str!( "./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql" ), + include_str!("./migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql"), ]; MigrationRunner::new(client, &migrations) From 1a45b2ec900b37c40608ca94a1d9d37cad12fce8 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Thu, 10 Jul 2025 10:06:33 +0200 Subject: [PATCH 09/15] Review security model for executing Event Trigger code. (#12463) When a function is owned by a superuser (bootstrap user or otherwise), we consider it safe to run it. Only a superuser could have installed it, typically from CREATE EXTENSION script: we trust the code to execute. ## Problem This is intended to solve running pg_graphql Event Triggers graphql_watch_ddl and graphql_watch_drop which are executing the secdef function graphql.increment_schema_version(). ## Summary of changes Allow executing Event Trigger function owned by a superuser and with SECURITY DEFINER properties. The Event Trigger code runs with superuser privileges, and we consider that it's fine. --------- Co-authored-by: Tristan Partin --- pgxn/neon/neon_ddl_handler.c | 33 +----- test_runner/fixtures/neon_fixtures.py | 27 +++++ .../test_event_trigger_extension--1.0.sql | 32 ++++++ .../test_event_trigger_extension.control | 8 ++ .../regress/test_download_extensions.py | 22 ---- .../regress/test_event_trigger_extension.py | 102 ++++++++++++++++++ 6 files changed, 174 insertions(+), 50 deletions(-) create mode 100644 test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension--1.0.sql create mode 100644 test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension.control create mode 100644 test_runner/regress/test_event_trigger_extension.py diff --git a/pgxn/neon/neon_ddl_handler.c b/pgxn/neon/neon_ddl_handler.c index 2ce7b0086b..1f03e52c67 100644 --- a/pgxn/neon/neon_ddl_handler.c +++ b/pgxn/neon/neon_ddl_handler.c @@ -953,7 +953,9 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private) /* * Fire Event Trigger if both function owner and current user are - * superuser, or none of them are. + * superuser. Allow executing Event Trigger function that belongs to a + * superuser when connected as a non-superuser, even when the function is + * SECURITY DEFINER. */ else if (event == FHET_START /* still enable it to pass pg_regress tests */ @@ -976,32 +978,7 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private) function_is_owned_by_super = superuser_arg(function_owner); /* - * 1. Refuse to run SECURITY DEFINER function that belongs to a - * superuser when the current user is not a superuser itself. - */ - if (!role_is_super - && function_is_owned_by_super - && function_is_secdef) - { - char *func_name = get_func_name(flinfo->fn_oid); - - ereport(WARNING, - (errmsg("Skipping Event Trigger"), - errdetail("Event Trigger function \"%s\" is owned by \"%s\" " - "and is SECURITY DEFINER", - func_name, - GetUserNameFromId(function_owner, false)))); - - /* - * we can't skip execution directly inside the fmgr_hook so - * instead we change the event trigger function to a noop - * function. - */ - force_noop(flinfo); - } - - /* - * 2. Refuse to run functions that belongs to a non-superuser when the + * Refuse to run functions that belongs to a non-superuser when the * current user is a superuser. * * We could run a SECURITY DEFINER user-function here and be safe with @@ -1009,7 +986,7 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private) * infrastructure maintenance operations, where we prefer to skip * running user-defined code. */ - else if (role_is_super && !function_is_owned_by_super) + if (role_is_super && !function_is_owned_by_super) { char *func_name = get_func_name(flinfo->fn_oid); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index f54d5be635..42924f9b83 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1795,6 +1795,33 @@ def neon_env_builder( record_property("preserve_database_files", builder.preserve_database_files) +@pytest.fixture(scope="function") +def neon_env_builder_local( + neon_env_builder: NeonEnvBuilder, + test_output_dir: Path, + pg_distrib_dir: Path, +) -> NeonEnvBuilder: + """ + Fixture to create a Neon environment for test with its own pg_install copy. + + This allows the test to edit the list of available extensions in the + local instance of Postgres used for the test, and install extensions via + downloading them when a remote extension is tested, for instance, or + copying files around for local extension testing. + """ + test_local_pginstall = test_output_dir / "pg_install" + log.info(f"copy {pg_distrib_dir} to {test_local_pginstall}") + + # We can't copy only the version that we are currently testing because other + # binaries like the storage controller need specific Postgres versions. + shutil.copytree(pg_distrib_dir, test_local_pginstall) + + neon_env_builder.pg_distrib_dir = test_local_pginstall + log.info(f"local neon_env_builder.pg_distrib_dir: {neon_env_builder.pg_distrib_dir}") + + return neon_env_builder + + @dataclass class PageserverPort: pg: int diff --git a/test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension--1.0.sql b/test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension--1.0.sql new file mode 100644 index 0000000000..2b82102802 --- /dev/null +++ b/test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension--1.0.sql @@ -0,0 +1,32 @@ +\echo Use "CREATE EXTENSION test_event_trigger_extension" to load this file. \quit + +CREATE SCHEMA event_trigger; + +create sequence if not exists event_trigger.seq_schema_version as int cycle; + +create or replace function event_trigger.increment_schema_version() + returns event_trigger + security definer + language plpgsql +as $$ +begin + perform pg_catalog.nextval('event_trigger.seq_schema_version'); +end; +$$; + +create or replace function event_trigger.get_schema_version() + returns int + security definer + language sql +as $$ + select last_value from event_trigger.seq_schema_version; +$$; + +-- On DDL event, increment the schema version number +create event trigger event_trigger_watch_ddl + on ddl_command_end + execute procedure event_trigger.increment_schema_version(); + +create event trigger event_trigger_watch_drop + on sql_drop + execute procedure event_trigger.increment_schema_version(); diff --git a/test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension.control b/test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension.control new file mode 100644 index 0000000000..4fe8c3341b --- /dev/null +++ b/test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension.control @@ -0,0 +1,8 @@ +default_version = '1.0' +comment = 'Test extension with Event Trigger' + +# make sure the extension objects are owned by the bootstrap user +# to check that the SECURITY DEFINER event trigger function is still +# called during non-superuser DDL events. +superuser = true +trusted = true diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index fe3b220c67..d7f78afac8 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -2,7 +2,6 @@ from __future__ import annotations import os import platform -import shutil import tarfile from enum import StrEnum from pathlib import Path @@ -31,27 +30,6 @@ if TYPE_CHECKING: from werkzeug.wrappers.request import Request -# use neon_env_builder_local fixture to override the default neon_env_builder fixture -# and use a test-specific pg_install instead of shared one -@pytest.fixture(scope="function") -def neon_env_builder_local( - neon_env_builder: NeonEnvBuilder, - test_output_dir: Path, - pg_distrib_dir: Path, -) -> NeonEnvBuilder: - test_local_pginstall = test_output_dir / "pg_install" - log.info(f"copy {pg_distrib_dir} to {test_local_pginstall}") - - # We can't copy only the version that we are currently testing because other - # binaries like the storage controller need specific Postgres versions. - shutil.copytree(pg_distrib_dir, test_local_pginstall) - - neon_env_builder.pg_distrib_dir = test_local_pginstall - log.info(f"local neon_env_builder.pg_distrib_dir: {neon_env_builder.pg_distrib_dir}") - - return neon_env_builder - - @final class RemoteExtension(StrEnum): SQL_ONLY = "test_extension_sql_only" diff --git a/test_runner/regress/test_event_trigger_extension.py b/test_runner/regress/test_event_trigger_extension.py new file mode 100644 index 0000000000..ac4351dcd5 --- /dev/null +++ b/test_runner/regress/test_event_trigger_extension.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import shutil +from pathlib import Path +from typing import TYPE_CHECKING, cast + +import pytest +from fixtures.log_helper import log +from fixtures.paths import BASE_DIR + +if TYPE_CHECKING: + from pathlib import Path + + from fixtures.neon_fixtures import ( + NeonEnvBuilder, + ) + from fixtures.pg_version import PgVersion + + +# use neon_env_builder_local fixture to override the default neon_env_builder fixture +# and use a test-specific pg_install instead of shared one +@pytest.fixture(scope="function") +def neon_env_builder_event_trigger_extension( + neon_env_builder_local: NeonEnvBuilder, + test_output_dir: Path, + pg_version: PgVersion, +) -> NeonEnvBuilder: + test_local_pginstall = test_output_dir / "pg_install" + + # Now copy the SQL only extension test_event_trigger_extension in the local + # pginstall extension directory on-disk + test_event_trigger_extension_dir = ( + BASE_DIR / "test_runner" / "regress" / "data" / "test_event_trigger_extension" + ) + + test_local_extension_dir = ( + test_local_pginstall / f"v{pg_version}" / "share" / "postgresql" / "extension" + ) + + log.info(f"copy {test_event_trigger_extension_dir} to {test_local_extension_dir}") + + for f in [ + test_event_trigger_extension_dir / "test_event_trigger_extension.control", + test_event_trigger_extension_dir / "test_event_trigger_extension--1.0.sql", + ]: + shutil.copy(f, test_local_extension_dir) + + return neon_env_builder_local + + +def test_event_trigger_extension(neon_env_builder_event_trigger_extension: NeonEnvBuilder): + """ + Test installing an extension that contains an Event Trigger. + + The Event Trigger function is owned by the extension owner, which at + CREATE EXTENSION is going to be the Postgres bootstrap user, per the + extension control file where both superuser = true and trusted = true. + + Also this function is SECURTY DEFINER, to allow for making changes to + the extension SQL objects, in our case a sequence. + + This test makes sure that the event trigger function is fired correctly + by non-privileged user DDL actions such as CREATE TABLE. + """ + env = neon_env_builder_event_trigger_extension.init_start() + env.create_branch("test_event_trigger_extension") + + endpoint = env.endpoints.create_start("test_event_trigger_extension") + extension = "test_event_trigger_extension" + database = "test_event_trigger_extension" + + endpoint.safe_psql(f"CREATE DATABASE {database}") + endpoint.safe_psql(f"CREATE EXTENSION {extension}", dbname=database) + + # check that the extension is owned by the bootstrap superuser (cloud_admin) + pg_bootstrap_superuser_name = "cloud_admin" + with endpoint.connect(dbname=database) as pg_conn: + with pg_conn.cursor() as cur: + cur.execute( + f"select rolname from pg_roles r join pg_extension e on r.oid = e.extowner where extname = '{extension}'" + ) + owner = cast("tuple[str]", cur.fetchone())[0] + assert owner == pg_bootstrap_superuser_name, ( + f"extension {extension} is not owned by bootstrap user '{pg_bootstrap_superuser_name}'" + ) + + # test that the SQL-only Event Trigger (SECURITY DEFINER function) runs + # correctly now that the extension has been installed + # + # create table to trigger the event trigger, twice, check sequence count + with endpoint.connect(dbname=database) as pg_conn: + log.info("creating SQL objects (tables)") + with pg_conn.cursor() as cur: + cur.execute("CREATE TABLE foo1(id int primary key)") + cur.execute("CREATE TABLE foo2(id int)") + + cur.execute("SELECT event_trigger.get_schema_version()") + res = cast("tuple[int]", cur.fetchone()) + ver = res[0] + + log.info(f"schema version is now {ver}") + assert ver == 2, "schema version is not 2" From 08b19f001c77afbcd3fbde06a11e495d6222967a Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 10 Jul 2025 11:07:21 +0100 Subject: [PATCH 10/15] pageserver: optionally force image layer creation on timeout (#12529) This PR introduces a `image_creation_timeout` to page servers so that we can force the image creation after a certain period. This is set to 1 day on dev/staging for now, and will rollout to production 1/2 weeks later. Majority of the PR are boilerplate code to add the new knob. Specific changes of the PR are: 1. During L0 compaction, check if we should force a compaction if min(LSN) of all delta layers < force_image_creation LSN. 2. During image creation, check if we should force a compaction if the image's LSN < force_image_creation LSN and there are newer deltas with overlapping key ranges. 3. Also tweaked the check image creation interval to make sure we honor image_creation_timeout. Vlad's note: This should be a no-op. I added an extra PS config for the large timeline threshold to enable this. --------- Co-authored-by: Chen Luo --- control_plane/src/pageserver.rs | 6 ++ libs/pageserver_api/src/config.rs | 9 ++ libs/pageserver_api/src/models.rs | 18 ++++ pageserver/src/config.rs | 6 ++ pageserver/src/tenant.rs | 9 ++ pageserver/src/tenant/timeline.rs | 68 ++++++++++-- pageserver/src/tenant/timeline/compaction.rs | 100 ++++++++++++++++-- .../regress/test_attach_tenant_config.py | 1 + test_runner/regress/test_compaction.py | 75 +++++++++++++ 9 files changed, 275 insertions(+), 17 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 3f66960edd..3673d1f4f2 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -452,6 +452,12 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'image_creation_threshold' as non zero integer")?, + // HADRON + image_layer_force_creation_period: settings + .remove("image_layer_force_creation_period") + .map(humantime::parse_duration) + .transpose() + .context("Failed to parse 'image_layer_force_creation_period' as duration")?, image_layer_creation_check_threshold: settings .remove("image_layer_creation_check_threshold") .map(|x| x.parse::()) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 22815955c1..9e9c7a4dcb 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -272,6 +272,8 @@ pub struct ConfigToml { pub timeline_import_config: TimelineImportConfig, #[serde(skip_serializing_if = "Option::is_none")] pub basebackup_cache_config: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub image_layer_generation_large_timeline_threshold: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -561,6 +563,11 @@ pub struct TenantConfigToml { pub gc_period: Duration, // Delta layer churn threshold to create L1 image layers. pub image_creation_threshold: usize, + // HADRON + // When the timeout is reached, PageServer will (1) force compact any remaining L0 deltas and + // (2) create image layers if there are any L1 deltas. + #[serde(with = "humantime_serde")] + pub image_layer_force_creation_period: Option, // Determines how much history is retained, to allow // branching and read replicas at an older point in time. // The unit is time. @@ -823,6 +830,7 @@ impl Default for ConfigToml { }, basebackup_cache_config: None, posthog_config: None, + image_layer_generation_large_timeline_threshold: Some(2 * 1024 * 1024 * 1024), } } } @@ -916,6 +924,7 @@ impl Default for TenantConfigToml { gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD) .expect("cannot parse default gc period"), image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD, + image_layer_force_creation_period: None, pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL) .expect("cannot parse default PITR interval"), walreceiver_connect_timeout: humantime::parse_duration( diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 6735320484..56dd95eab3 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -597,6 +597,9 @@ pub struct TenantConfigPatch { pub gc_period: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub image_creation_threshold: FieldPatch, + // HADRON + #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub image_layer_force_creation_period: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub pitr_interval: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] @@ -700,6 +703,11 @@ pub struct TenantConfig { #[serde(skip_serializing_if = "Option::is_none")] pub image_creation_threshold: Option, + // HADRON + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "humantime_serde")] + pub image_layer_force_creation_period: Option, + #[serde(skip_serializing_if = "Option::is_none")] #[serde(with = "humantime_serde")] pub pitr_interval: Option, @@ -798,6 +806,7 @@ impl TenantConfig { mut gc_horizon, mut gc_period, mut image_creation_threshold, + mut image_layer_force_creation_period, mut pitr_interval, mut walreceiver_connect_timeout, mut lagging_wal_timeout, @@ -861,6 +870,11 @@ impl TenantConfig { patch .image_creation_threshold .apply(&mut image_creation_threshold); + // HADRON + patch + .image_layer_force_creation_period + .map(|v| humantime::parse_duration(&v))? + .apply(&mut image_layer_force_creation_period); patch .pitr_interval .map(|v| humantime::parse_duration(&v))? @@ -942,6 +956,7 @@ impl TenantConfig { gc_horizon, gc_period, image_creation_threshold, + image_layer_force_creation_period, pitr_interval, walreceiver_connect_timeout, lagging_wal_timeout, @@ -1016,6 +1031,9 @@ impl TenantConfig { image_creation_threshold: self .image_creation_threshold .unwrap_or(global_conf.image_creation_threshold), + image_layer_force_creation_period: self + .image_layer_force_creation_period + .or(global_conf.image_layer_force_creation_period), pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval), walreceiver_connect_timeout: self .walreceiver_connect_timeout diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 15ec31b0a6..f64c5838ff 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -252,6 +252,10 @@ pub struct PageServerConf { pub timeline_import_config: pageserver_api::config::TimelineImportConfig, pub basebackup_cache_config: Option, + + /// Defines what is a big tenant for the purpose of image layer generation. + /// See Timeline::should_check_if_image_layers_required + pub image_layer_generation_large_timeline_threshold: Option, } /// Token for authentication to safekeepers @@ -432,6 +436,7 @@ impl PageServerConf { posthog_config, timeline_import_config, basebackup_cache_config, + image_layer_generation_large_timeline_threshold, } = config_toml; let mut conf = PageServerConf { @@ -490,6 +495,7 @@ impl PageServerConf { dev_mode, timeline_import_config, basebackup_cache_config, + image_layer_generation_large_timeline_threshold, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 240ba36236..7e2e6d96b8 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4171,6 +4171,15 @@ impl TenantShard { .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold) } + // HADRON + pub fn get_image_creation_timeout(&self) -> Option { + let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); + tenant_conf.image_layer_force_creation_period.or(self + .conf + .default_tenant_conf + .image_layer_force_creation_period) + } + pub fn get_pitr_interval(&self) -> Duration { let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); tenant_conf diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0a026d288e..a9bc0a060b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -351,6 +351,13 @@ pub struct Timeline { last_image_layer_creation_check_at: AtomicLsn, last_image_layer_creation_check_instant: std::sync::Mutex>, + // HADRON + /// If a key range has writes with LSN > force_image_creation_lsn, then we should force image layer creation + /// on this key range. + force_image_creation_lsn: AtomicLsn, + /// The last time instant when force_image_creation_lsn is computed. + force_image_creation_lsn_computed_at: std::sync::Mutex>, + /// Current logical size of the "datadir", at the last LSN. current_logical_size: LogicalSize, @@ -2846,6 +2853,18 @@ impl Timeline { .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold) } + // HADRON + fn get_image_creation_timeout(&self) -> Option { + let tenant_conf = self.tenant_conf.load(); + tenant_conf + .tenant_conf + .image_layer_force_creation_period + .or(self + .conf + .default_tenant_conf + .image_layer_force_creation_period) + } + fn get_compaction_algorithm_settings(&self) -> CompactionAlgorithmSettings { let tenant_conf = &self.tenant_conf.load(); tenant_conf @@ -3115,7 +3134,9 @@ impl Timeline { repartition_threshold: 0, last_image_layer_creation_check_at: AtomicLsn::new(0), last_image_layer_creation_check_instant: Mutex::new(None), - + // HADRON + force_image_creation_lsn: AtomicLsn::new(0), + force_image_creation_lsn_computed_at: std::sync::Mutex::new(None), last_received_wal: Mutex::new(None), rel_size_latest_cache: RwLock::new(HashMap::new()), rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)), @@ -5036,6 +5057,7 @@ impl Timeline { .create_image_layers( &partitions, self.initdb_lsn, + None, ImageLayerCreationMode::Initial, ctx, LastImageLayerCreationStatus::Initial, @@ -5307,14 +5329,19 @@ impl Timeline { } // Is it time to create a new image layer for the given partition? True if we want to generate. - async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool { + async fn time_for_new_image_layer( + &self, + partition: &KeySpace, + lsn: Lsn, + force_image_creation_lsn: Option, + ) -> bool { let threshold = self.get_image_creation_threshold(); let guard = self.layers.read(LayerManagerLockHolder::Compaction).await; let Ok(layers) = guard.layer_map() else { return false; }; - + let mut min_image_lsn: Lsn = Lsn::MAX; let mut max_deltas = 0; for part_range in &partition.ranges { let image_coverage = layers.image_coverage(part_range, lsn); @@ -5349,9 +5376,22 @@ impl Timeline { return true; } } + min_image_lsn = min(min_image_lsn, img_lsn); } } + // HADRON + if min_image_lsn < force_image_creation_lsn.unwrap_or(Lsn(0)) && max_deltas > 0 { + info!( + "forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}", + partition.ranges[0].start, + partition.ranges[0].end, + min_image_lsn, + force_image_creation_lsn.unwrap() + ); + return true; + } + debug!( max_deltas, "none of the partitioned ranges had >= {threshold} deltas" @@ -5577,7 +5617,7 @@ impl Timeline { /// suffer from the lack of image layers /// 2. For small tenants (that can mostly fit in RAM), we use a much longer interval fn should_check_if_image_layers_required(self: &Arc, lsn: Lsn) -> bool { - const LARGE_TENANT_THRESHOLD: u64 = 2 * 1024 * 1024 * 1024; + let large_timeline_threshold = self.conf.image_layer_generation_large_timeline_threshold; let last_checks_at = self.last_image_layer_creation_check_at.load(); let distance = lsn @@ -5591,12 +5631,12 @@ impl Timeline { let mut time_based_decision = false; let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap(); if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() { - let check_required_after = if Into::::into(&logical_size) >= LARGE_TENANT_THRESHOLD - { - self.get_checkpoint_timeout() - } else { - Duration::from_secs(3600 * 48) - }; + let check_required_after = + if Some(Into::::into(&logical_size)) >= large_timeline_threshold { + self.get_checkpoint_timeout() + } else { + Duration::from_secs(3600 * 48) + }; time_based_decision = match *last_check_instant { Some(last_check) => { @@ -5624,10 +5664,12 @@ impl Timeline { /// true = we have generate all image layers, false = we preempt the process for L0 compaction. /// /// `partition_mode` is only for logging purpose and is not used anywhere in this function. + #[allow(clippy::too_many_arguments)] async fn create_image_layers( self: &Arc, partitioning: &KeyPartitioning, lsn: Lsn, + force_image_creation_lsn: Option, mode: ImageLayerCreationMode, ctx: &RequestContext, last_status: LastImageLayerCreationStatus, @@ -5731,7 +5773,11 @@ impl Timeline { } else if let ImageLayerCreationMode::Try = mode { // check_for_image_layers = false -> skip // check_for_image_layers = true -> check time_for_new_image_layer -> skip/generate - if !check_for_image_layers || !self.time_for_new_image_layer(partition, lsn).await { + if !check_for_image_layers + || !self + .time_for_new_image_layer(partition, lsn, force_image_creation_lsn) + .await + { start = img_range.end; continue; } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 18a0ca852d..171f9d1284 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -4,10 +4,11 @@ //! //! The old legacy algorithm is implemented directly in `timeline.rs`. +use std::cmp::min; use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::ops::{Deref, Range}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; use super::layer_manager::LayerManagerLockHolder; use super::{ @@ -33,6 +34,7 @@ use pageserver_api::models::{CompactInfoResponse, CompactKeyRange}; use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId}; use pageserver_compaction::helpers::{fully_contains, overlaps_with}; use pageserver_compaction::interface::*; +use postgres_ffi::to_pg_timestamp; use serde::Serialize; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio_util::sync::CancellationToken; @@ -45,6 +47,7 @@ use wal_decoder::models::value::Value; use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}; use crate::page_cache; +use crate::pgdatadir_mapping::LsnForTimestamp; use crate::statvfs::Statvfs; use crate::tenant::checks::check_valid_layermap; use crate::tenant::gc_block::GcBlock; @@ -1267,6 +1270,12 @@ impl Timeline { // Define partitioning schema if needed + // HADRON + let force_image_creation_lsn = self + .get_or_compute_force_image_creation_lsn(cancel, ctx) + .await + .map_err(CompactionError::Other)?; + // 1. L0 Compact let l0_outcome = { let timer = self.metrics.compact_time_histo.start_timer(); @@ -1274,6 +1283,7 @@ impl Timeline { .compact_level0( target_file_size, options.flags.contains(CompactFlags::ForceL0Compaction), + force_image_creation_lsn, ctx, ) .await?; @@ -1376,6 +1386,7 @@ impl Timeline { .create_image_layers( &partitioning, lsn, + force_image_creation_lsn, mode, &image_ctx, self.last_image_layer_creation_status @@ -1472,6 +1483,63 @@ impl Timeline { Ok(CompactionOutcome::Done) } + /* BEGIN_HADRON */ + // Get the force image creation LSN. Compute it if the last computed LSN is too old. + async fn get_or_compute_force_image_creation_lsn( + self: &Arc, + cancel: &CancellationToken, + ctx: &RequestContext, + ) -> anyhow::Result> { + const FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes + let image_layer_force_creation_period = self.get_image_creation_timeout(); + if image_layer_force_creation_period.is_none() { + return Ok(None); + } + + let image_layer_force_creation_period = image_layer_force_creation_period.unwrap(); + let force_image_creation_lsn_computed_at = + *self.force_image_creation_lsn_computed_at.lock().unwrap(); + if force_image_creation_lsn_computed_at.is_none() + || force_image_creation_lsn_computed_at.unwrap().elapsed() + > FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL + { + let now: SystemTime = SystemTime::now(); + let timestamp = now + .checked_sub(image_layer_force_creation_period) + .ok_or_else(|| { + anyhow::anyhow!( + "image creation timeout is too large: {image_layer_force_creation_period:?}" + ) + })?; + let timestamp = to_pg_timestamp(timestamp); + let force_image_creation_lsn = match self + .find_lsn_for_timestamp(timestamp, cancel, ctx) + .await? + { + LsnForTimestamp::Present(lsn) | LsnForTimestamp::Future(lsn) => lsn, + _ => { + let gc_lsn = *self.get_applied_gc_cutoff_lsn(); + tracing::info!( + "no LSN found for timestamp {timestamp:?}, using latest GC cutoff LSN {}", + gc_lsn + ); + gc_lsn + } + }; + self.force_image_creation_lsn + .store(force_image_creation_lsn); + *self.force_image_creation_lsn_computed_at.lock().unwrap() = Some(Instant::now()); + tracing::info!( + "computed force image creation LSN: {}", + force_image_creation_lsn + ); + Ok(Some(force_image_creation_lsn)) + } else { + Ok(Some(self.force_image_creation_lsn.load())) + } + } + /* END_HADRON */ + /// Check for layers that are elegible to be rewritten: /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that /// we don't indefinitely retain keys in this shard that aren't needed. @@ -1801,6 +1869,7 @@ impl Timeline { self: &Arc, target_file_size: u64, force_compaction_ignore_threshold: bool, + force_compaction_lsn: Option, ctx: &RequestContext, ) -> Result { let CompactLevel0Phase1Result { @@ -1821,6 +1890,7 @@ impl Timeline { stats, target_file_size, force_compaction_ignore_threshold, + force_compaction_lsn, &ctx, ) .instrument(phase1_span) @@ -1843,6 +1913,7 @@ impl Timeline { mut stats: CompactLevel0Phase1StatsBuilder, target_file_size: u64, force_compaction_ignore_threshold: bool, + force_compaction_lsn: Option, ctx: &RequestContext, ) -> Result { let begin = tokio::time::Instant::now(); @@ -1872,11 +1943,28 @@ impl Timeline { return Ok(CompactLevel0Phase1Result::default()); } } else { - debug!( - level0_deltas = level0_deltas.len(), - threshold, "too few deltas to compact" - ); - return Ok(CompactLevel0Phase1Result::default()); + // HADRON + let min_lsn = level0_deltas + .iter() + .map(|a| a.get_lsn_range().start) + .reduce(min); + if force_compaction_lsn.is_some() + && min_lsn.is_some() + && min_lsn.unwrap() < force_compaction_lsn.unwrap() + { + info!( + "forcing L0 compaction of {} L0 deltas. Min lsn: {}, force compaction lsn: {}", + level0_deltas.len(), + min_lsn.unwrap(), + force_compaction_lsn.unwrap() + ); + } else { + debug!( + level0_deltas = level0_deltas.len(), + threshold, "too few deltas to compact" + ); + return Ok(CompactLevel0Phase1Result::default()); + } } } diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 7788faceb4..eaaa3014a5 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -165,6 +165,7 @@ def test_fully_custom_config(positive_env: NeonEnv): "gc_horizon": 23 * (1024 * 1024), "gc_period": "2h 13m", "image_creation_threshold": 7, + "image_layer_force_creation_period": "1m", "pitr_interval": "1m", "lagging_wal_timeout": "23m", "lazy_slru_download": True, diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 1570d40ae9..e67161c6b7 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -944,3 +944,78 @@ def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool f"SELECT count(*) FROM foo WHERE id={v} and val=repeat('abcde{v:0>3}', 500)" ) assert res[0][0] == 1 + + +# BEGIN_HADRON +def get_layer_map(env, tenant_shard_id, timeline_id, ps_id): + client = env.pageservers[ps_id].http_client() + layer_map = client.layer_map_info(tenant_shard_id, timeline_id) + image_layer_count = 0 + delta_layer_count = 0 + for layer in layer_map.historic_layers: + if layer.kind == "Image": + image_layer_count += 1 + elif layer.kind == "Delta": + delta_layer_count += 1 + return image_layer_count, delta_layer_count + + +def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder): + """ + Tests that page server can force creating new images if image creation timeout is enabled + """ + # use large knobs to disable L0 compaction/image creation except for the force image creation + tenant_conf = { + "compaction_threshold": "100", + "image_creation_threshold": "100", + "image_layer_creation_check_threshold": "1", + "checkpoint_distance": 10 * 1024, + "checkpoint_timeout": "1s", + "image_layer_force_creation_period": "1s", + # The lsn for forced image layer creations is calculated once every 10 minutes. + # Hence, drive compaction manually such that the test doesn't compute it at the + # wrong time. + "compaction_period": "0s", + } + + # consider every tenant large to run the image layer generation check more eagerly + neon_env_builder.pageserver_config_override = ( + "image_layer_generation_large_timeline_threshold=0" + ) + + neon_env_builder.num_pageservers = 1 + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + endpoint = env.endpoints.create_start("main") + endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") + # Generate some rows. + for v in range(10): + endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))") + + # Sleep a bit such that the inserts are considered when calculating the forced image layer creation LSN. + time.sleep(2) + + def check_force_image_creation(): + ps_http = env.pageserver.http_client() + ps_http.timeline_compact(tenant_id, timeline_id) + image, delta = get_layer_map(env, tenant_id, timeline_id, 0) + log.info(f"images: {image}, deltas: {delta}") + assert image > 0 + + env.pageserver.assert_log_contains("forcing L0 compaction of") + env.pageserver.assert_log_contains("forcing image creation for partitioned range") + + wait_until(check_force_image_creation) + + endpoint.stop_and_destroy() + + env.pageserver.allowed_errors.append( + ".*created delta file of size.*larger than double of target.*" + ) + + +# END_HADRON From f4b03ddd7b4cb858430b16d642f5f80f11e8b5b1 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 10 Jul 2025 12:18:37 +0200 Subject: [PATCH 11/15] pageserver/client_grpc: reap idle pool resources (#12476) ## Problem The gRPC client pools don't reap idle resources. Touches #11735. Requires #12475. ## Summary of changes Reap idle pool resources (channels/clients/streams) after 3 minutes of inactivity. Also restructure the `StreamPool` to use a mutex rather than atomics for synchronization, for simplicity. This will be optimized later. --- Cargo.lock | 1 + pageserver/client_grpc/Cargo.toml | 4 + pageserver/client_grpc/src/pool.rs | 241 ++++++++++++++++++++++++----- 3 files changed, 207 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index caed814d5f..4150944ad0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4506,6 +4506,7 @@ dependencies = [ "pageserver_page_api", "tokio", "tokio-stream", + "tokio-util", "tonic 0.13.1", "tracing", "utils", diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index 84e27abb84..ca224900ac 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -4,6 +4,9 @@ version = "0.1.0" edition.workspace = true license.workspace = true +[features] +testing = ["pageserver_api/testing"] + [dependencies] anyhow.workspace = true bytes.workspace = true @@ -13,6 +16,7 @@ pageserver_api.workspace = true pageserver_page_api.workspace = true tokio.workspace = true tokio-stream.workspace = true +tokio-util.workspace = true tonic.workspace = true tracing.workspace = true utils.workspace = true diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 5a50004fd1..89b3bd646f 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -34,10 +34,12 @@ use std::num::NonZero; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; +use std::time::{Duration, Instant}; use futures::StreamExt as _; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; +use tokio_util::sync::CancellationToken; use tonic::transport::{Channel, Endpoint}; use tracing::{error, warn}; @@ -45,6 +47,25 @@ use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::ShardIndex; +/// Reap channels/clients/streams that have been idle for this long. +/// +/// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to +/// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle. +/// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we +/// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to +/// account for it when setting the reap threshold. Alternatively, we can immediately reap empty +/// channels, and/or stream pool clients. +const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) { + false => Duration::from_secs(180), + true => Duration::from_secs(1), // exercise reaping in tests +}; + +/// Reap idle resources with this interval. +const REAP_IDLE_INTERVAL: Duration = match cfg!(any(test, feature = "testing")) { + false => Duration::from_secs(10), + true => Duration::from_secs(1), // exercise reaping in tests +}; + /// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2 /// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this. /// The pool does not limit the number of channels, and instead relies on `ClientPool` or @@ -52,7 +73,6 @@ use utils::shard::ShardIndex; /// /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads. /// -/// TODO: reap idle channels. /// TODO: consider prewarming a set of channels, to avoid initial connection latency. /// TODO: consider adding a circuit breaker for errors and fail fast. pub struct ChannelPool { @@ -62,6 +82,8 @@ pub struct ChannelPool { max_clients_per_channel: NonZero, /// Open channels. channels: Mutex>, + /// Reaps idle channels. + idle_reaper: Reaper, /// Channel ID generator. next_channel_id: AtomicUsize, } @@ -73,6 +95,9 @@ struct ChannelEntry { channel: Channel, /// Number of clients using this channel. clients: usize, + /// The channel has been idle (no clients) since this time. None if channel is in use. + /// INVARIANT: Some if clients == 0, otherwise None. + idle_since: Option, } impl ChannelPool { @@ -82,12 +107,15 @@ impl ChannelPool { E: TryInto + Send + Sync + 'static, >::Error: std::error::Error + Send + Sync, { - Ok(Arc::new(Self { + let pool = Arc::new(Self { endpoint: endpoint.try_into()?, max_clients_per_channel, channels: Mutex::default(), + idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), next_channel_id: AtomicUsize::default(), - })) + }); + pool.idle_reaper.spawn(&pool); + Ok(pool) } /// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel. @@ -116,8 +144,14 @@ impl ChannelPool { entry.clients <= self.max_clients_per_channel.get(), "channel overflow" ); + assert_eq!( + entry.idle_since.is_some(), + entry.clients == 0, + "incorrect channel idle state" + ); if entry.clients < self.max_clients_per_channel.get() { entry.clients += 1; + entry.idle_since = None; return ChannelGuard { pool: Arc::downgrade(self), id, @@ -134,6 +168,7 @@ impl ChannelPool { let entry = ChannelEntry { channel: channel.clone(), clients: 1, // account for the guard below + idle_since: None, }; channels.insert(id, entry); @@ -145,6 +180,20 @@ impl ChannelPool { } } +impl Reapable for ChannelPool { + /// Reaps channels that have been idle since before the cutoff. + fn reap_idle(&self, cutoff: Instant) { + self.channels.lock().unwrap().retain(|_, entry| { + let Some(idle_since) = entry.idle_since else { + assert_ne!(entry.clients, 0, "empty channel not marked idle"); + return true; + }; + assert_eq!(entry.clients, 0, "idle channel has clients"); + idle_since >= cutoff + }) + } +} + /// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`, /// since the gRPC client requires an owned `Channel`. pub struct ChannelGuard { @@ -167,10 +216,15 @@ impl Drop for ChannelGuard { let Some(pool) = self.pool.upgrade() else { return; // pool was dropped }; + let mut channels = pool.channels.lock().unwrap(); let entry = channels.get_mut(&self.id).expect("unknown channel"); + assert!(entry.idle_since.is_none(), "active channel marked idle"); assert!(entry.clients > 0, "channel underflow"); entry.clients -= 1; + if entry.clients == 0 { + entry.idle_since = Some(Instant::now()); // mark channel as idle + } } } @@ -179,8 +233,6 @@ impl Drop for ChannelGuard { /// number of concurrent clients to `max_clients` via semaphore. /// /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads. -/// -/// TODO: reap idle clients. pub struct ClientPool { /// Tenant ID. tenant_id: TenantId, @@ -201,6 +253,8 @@ pub struct ClientPool { /// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle /// clients are reaped. idle: Mutex>, + /// Reaps idle clients. + idle_reaper: Reaper, /// Unique client ID generator. next_client_id: AtomicUsize, } @@ -212,6 +266,9 @@ struct ClientEntry { client: page_api::Client, /// The channel guard for the channel used by the client. channel_guard: ChannelGuard, + /// The client has been idle since this time. All clients in `ClientPool::idle` are idle by + /// definition, so this is the time when it was added back to the pool. + idle_since: Instant, } impl ClientPool { @@ -226,16 +283,19 @@ impl ClientPool { auth_token: Option, max_clients: Option>, ) -> Arc { - Arc::new(Self { + let pool = Arc::new(Self { tenant_id, timeline_id, shard_id, auth_token, channel_pool, idle: Mutex::default(), + idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))), next_client_id: AtomicUsize::default(), - }) + }); + pool.idle_reaper.spawn(&pool); + pool } /// Gets a client from the pool, or creates a new one if necessary. Connections are established @@ -287,6 +347,16 @@ impl ClientPool { } } +impl Reapable for ClientPool { + /// Reaps clients that have been idle since before the cutoff. + fn reap_idle(&self, cutoff: Instant) { + self.idle + .lock() + .unwrap() + .retain(|_, entry| entry.idle_since >= cutoff) + } +} + /// A client acquired from the pool. The inner client can be accessed via Deref. The client is /// returned to the pool when dropped. pub struct ClientGuard { @@ -317,9 +387,11 @@ impl Drop for ClientGuard { let Some(pool) = self.pool.upgrade() else { return; // pool was dropped }; + let entry = ClientEntry { client: self.client.take().expect("dropped once"), channel_guard: self.channel_guard.take().expect("dropped once"), + idle_since: Instant::now(), }; pool.idle.lock().unwrap().insert(self.id, entry); @@ -334,7 +406,6 @@ impl Drop for ClientGuard { /// a single request and await the response. Internally, requests are multiplexed across streams and /// channels. This allows proper queue depth enforcement and response routing. /// -/// TODO: reap idle streams. /// TODO: consider making this generic over request and response types; not currently needed. pub struct StreamPool { /// The client pool to acquire clients from. Must be unbounded. @@ -344,7 +415,7 @@ pub struct StreamPool { /// Incoming requests will be sent over an existing stream with available capacity. If all /// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each /// stream has an associated Tokio task that processes requests and responses. - streams: Arc>>, + streams: Mutex>, /// The max number of concurrent streams, or None if unbounded. max_streams: Option>, /// The max number of concurrent requests per stream. @@ -352,6 +423,8 @@ pub struct StreamPool { /// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`. /// None if the pool is unbounded. limiter: Option>, + /// Reaps idle streams. + idle_reaper: Reaper, /// Stream ID generator. next_stream_id: AtomicUsize, } @@ -364,9 +437,11 @@ type ResponseSender = oneshot::Sender>; struct StreamEntry { /// Sends caller requests to the stream task. The stream task exits when this is dropped. sender: RequestSender, - /// Number of in-flight requests on this stream. This is an atomic to allow decrementing it on - /// completion without acquiring the `StreamPool::streams` lock. - queue_depth: Arc, + /// Number of in-flight requests on this stream. + queue_depth: usize, + /// The time when this stream went idle (queue_depth == 0). + /// INVARIANT: Some if queue_depth == 0, otherwise None. + idle_since: Option, } impl StreamPool { @@ -383,16 +458,19 @@ impl StreamPool { max_queue_depth: NonZero, ) -> Arc { assert!(client_pool.limiter.is_none(), "bounded client pool"); - Arc::new(Self { + let pool = Arc::new(Self { client_pool, - streams: Arc::default(), + streams: Mutex::default(), limiter: max_streams.map(|max_streams| { Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get())) }), max_streams, max_queue_depth, + idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), next_stream_id: AtomicUsize::default(), - }) + }); + pool.idle_reaper.spawn(&pool); + pool } /// Acquires an available stream from the pool, or spins up a new stream async if all streams @@ -412,8 +490,8 @@ impl StreamPool { /// * Allow concurrent clients to join onto streams while they're spun up. /// * Allow spinning up multiple streams concurrently, but don't overshoot limits. /// - /// For now, we just do something simple and functional, but very inefficient (linear scan). - pub async fn get(&self) -> StreamGuard { + /// For now, we just do something simple but inefficient (linear scan under mutex). + pub async fn get(self: &Arc) -> StreamGuard { // Acquire a permit if the pool is bounded. let mut permit = None; if let Some(limiter) = self.limiter.clone() { @@ -422,23 +500,23 @@ impl StreamPool { let mut streams = self.streams.lock().unwrap(); // Look for a pooled stream with available capacity. - for entry in streams.values() { + for (&id, entry) in streams.iter_mut() { assert!( - entry.queue_depth.load(Ordering::Relaxed) <= self.max_queue_depth.get(), + entry.queue_depth <= self.max_queue_depth.get(), "stream queue overflow" ); - if entry - .queue_depth - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |queue_depth| { - // Increment the queue depth via compare-and-swap. - // TODO: review ordering. - (queue_depth < self.max_queue_depth.get()).then_some(queue_depth + 1) - }) - .is_ok() - { + assert_eq!( + entry.idle_since.is_some(), + entry.queue_depth == 0, + "incorrect stream idle state" + ); + if entry.queue_depth < self.max_queue_depth.get() { + entry.queue_depth += 1; + entry.idle_since = None; return StreamGuard { + pool: Arc::downgrade(self), + id, sender: entry.sender.clone(), - queue_depth: entry.queue_depth.clone(), permit, }; } @@ -448,11 +526,11 @@ impl StreamPool { // return the guard, while spinning up the stream task async. This allows other callers to // join onto this stream and also create additional streams concurrently if this fills up. let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed); - let queue_depth = Arc::new(AtomicUsize::new(1)); // reserve quota for this caller let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get()); let entry = StreamEntry { sender: req_tx.clone(), - queue_depth: queue_depth.clone(), + queue_depth: 1, // reserve quota for this caller + idle_since: None, }; streams.insert(id, entry); @@ -461,20 +539,23 @@ impl StreamPool { }; let client_pool = self.client_pool.clone(); - let streams = self.streams.clone(); + let pool = Arc::downgrade(self); tokio::spawn(async move { if let Err(err) = Self::run_stream(client_pool, req_rx).await { error!("stream failed: {err}"); } - // Remove stream from pool on exit. - let entry = streams.lock().unwrap().remove(&id); - assert!(entry.is_some(), "unknown stream ID: {id}"); + // Remove stream from pool on exit. Weak reference to avoid holding the pool alive. + if let Some(pool) = pool.upgrade() { + let entry = pool.streams.lock().unwrap().remove(&id); + assert!(entry.is_some(), "unknown stream ID: {id}"); + } }); StreamGuard { + pool: Arc::downgrade(self), + id, sender: req_tx, - queue_depth, permit, } } @@ -552,11 +633,26 @@ impl StreamPool { } } +impl Reapable for StreamPool { + /// Reaps streams that have been idle since before the cutoff. + fn reap_idle(&self, cutoff: Instant) { + self.streams.lock().unwrap().retain(|_, entry| { + let Some(idle_since) = entry.idle_since else { + assert_ne!(entry.queue_depth, 0, "empty stream not marked idle"); + return true; + }; + assert_eq!(entry.queue_depth, 0, "idle stream has requests"); + idle_since >= cutoff + }); + } +} + /// A pooled stream reference. Can be used to send a single request, to properly enforce queue /// depth. Queue depth is already reserved and will be returned on drop. pub struct StreamGuard { + pool: Weak, + id: StreamID, sender: RequestSender, - queue_depth: Arc, permit: Option, // None if pool is unbounded } @@ -588,11 +684,78 @@ impl StreamGuard { impl Drop for StreamGuard { fn drop(&mut self) { + let Some(pool) = self.pool.upgrade() else { + return; // pool was dropped + }; + // Release the queue depth reservation on drop. This can prematurely decrement it if dropped // before the response is received, but that's okay. - let prev_queue_depth = self.queue_depth.fetch_sub(1, Ordering::SeqCst); - assert!(prev_queue_depth > 0, "stream queue underflow"); + let mut streams = pool.streams.lock().unwrap(); + let entry = streams.get_mut(&self.id).expect("unknown stream"); + assert!(entry.idle_since.is_none(), "active stream marked idle"); + assert!(entry.queue_depth > 0, "stream queue underflow"); + entry.queue_depth -= 1; + if entry.queue_depth == 0 { + entry.idle_since = Some(Instant::now()); // mark stream as idle + } _ = self.permit; // returned on drop, referenced for visibility } } + +/// Periodically reaps idle resources from a pool. +struct Reaper { + /// The task check interval. + interval: Duration, + /// The threshold for reaping idle resources. + threshold: Duration, + /// Cancels the reaper task. Cancelled when the reaper is dropped. + cancel: CancellationToken, +} + +impl Reaper { + /// Creates a new reaper. + pub fn new(threshold: Duration, interval: Duration) -> Self { + Self { + cancel: CancellationToken::new(), + threshold, + interval, + } + } + + /// Spawns a task to periodically reap idle resources from the given task pool. The task is + /// cancelled when the reaper is dropped. + pub fn spawn(&self, pool: &Arc) { + // NB: hold a weak pool reference, otherwise the task will prevent dropping the pool. + let pool = Arc::downgrade(pool); + let cancel = self.cancel.clone(); + let (interval, threshold) = (self.interval, self.threshold); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = tokio::time::sleep(interval) => { + let Some(pool) = pool.upgrade() else { + return; // pool was dropped + }; + pool.reap_idle(Instant::now() - threshold); + } + + _ = cancel.cancelled() => return, + } + } + }); + } +} + +impl Drop for Reaper { + fn drop(&mut self) { + self.cancel.cancel(); // cancel reaper task + } +} + +/// A reapable resource pool. +trait Reapable: Send + Sync + 'static { + /// Reaps resources that have been idle since before the given cutoff. + fn reap_idle(&self, cutoff: Instant); +} From bdca5b500b078eb9afb528fd464f496e07c97024 Mon Sep 17 00:00:00 2001 From: Mikhail Date: Thu, 10 Jul 2025 12:11:53 +0100 Subject: [PATCH 12/15] Fix test_lfc_prewarm: reduce number of prewarms, sleep before LFC offloading (#12515) Fixes: - Sleep before LFC offloading in `test_lfc_prewarm[autoprewarm]` to ensure offloaded LFC is the one exported after all writes finish - Reduce number of prewarms and increase timeout in `test_lfc_prewarm_under_workload` as debug builds were failing due to timeout. Additional changes: - Remove `check_pinned_entries`: https://github.com/neondatabase/neon/pull/12447#discussion_r2185946210 - Fix LFC error metrics description: https://github.com/neondatabase/neon/pull/12486#discussion_r2190763107 --- compute_tools/src/metrics.rs | 4 +-- test_runner/regress/test_lfc_prewarm.py | 44 +++++++++++-------------- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/compute_tools/src/metrics.rs b/compute_tools/src/metrics.rs index 91dedbb42a..6e4df73c0f 100644 --- a/compute_tools/src/metrics.rs +++ b/compute_tools/src/metrics.rs @@ -108,7 +108,7 @@ pub(crate) static LFC_PREWARMS: Lazy = Lazy::new(|| { pub(crate) static LFC_PREWARM_ERRORS: Lazy = Lazy::new(|| { register_int_counter!( "compute_ctl_lfc_prewarm_errors_total", - "Total number of LFC prewarms errors requested by compute_ctl or autoprewarm option", + "Total number of LFC prewarm errors", ) .expect("failed to define a metric") }); @@ -124,7 +124,7 @@ pub(crate) static LFC_OFFLOADS: Lazy = Lazy::new(|| { pub(crate) static LFC_OFFLOAD_ERRORS: Lazy = Lazy::new(|| { register_int_counter!( "compute_ctl_lfc_offload_errors_total", - "Total number of LFC offload errors requested by compute_ctl or lfc_offload_period_seconds option", + "Total number of LFC offload errors", ) .expect("failed to define a metric") }); diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index ae36bbda79..22e5bf576f 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -1,6 +1,7 @@ import random import threading from enum import StrEnum +from time import sleep from typing import Any import pytest @@ -24,18 +25,7 @@ OFFLOAD_LABEL = "compute_ctl_lfc_offloads_total" OFFLOAD_ERR_LABEL = "compute_ctl_lfc_offload_errors_total" METHOD_VALUES = [e for e in PrewarmMethod] METHOD_IDS = [e.value for e in PrewarmMethod] - - -def check_pinned_entries(cur: Cursor): - """ - Wait till none of LFC buffers are pinned - """ - - def none_pinned(): - cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_chunks_pinned'") - assert cur.fetchall()[0][0] == 0 - - wait_until(none_pinned) +AUTOOFFLOAD_INTERVAL_SECS = 2 def prom_parse(client: EndpointHttpClient) -> dict[str, float]: @@ -49,9 +39,18 @@ def prom_parse(client: EndpointHttpClient) -> dict[str, float]: def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) -> Any: + if method == PrewarmMethod.POSTGRES: + cur.execute("select get_local_cache_state()") + return cur.fetchall()[0][0] + if method == PrewarmMethod.AUTOPREWARM: + # With autoprewarm, we need to be sure LFC was offloaded after all writes + # finish, so we sleep. Otherwise we'll have less prewarmed pages than we want + sleep(AUTOOFFLOAD_INTERVAL_SECS) client.offload_lfc_wait() - elif method == PrewarmMethod.COMPUTE_CTL: + return + + if method == PrewarmMethod.COMPUTE_CTL: status = client.prewarm_lfc_status() assert status["status"] == "not_prewarmed" assert "error" not in status @@ -60,11 +59,9 @@ def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) parsed = prom_parse(client) desired = {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0, OFFLOAD_ERR_LABEL: 0, PREWARM_ERR_LABEL: 0} assert parsed == desired, f"{parsed=} != {desired=}" - elif method == PrewarmMethod.POSTGRES: - cur.execute("select get_local_cache_state()") - return cur.fetchall()[0][0] - else: - raise AssertionError(f"{method} not in PrewarmMethod") + return + + raise AssertionError(f"{method} not in PrewarmMethod") def prewarm_endpoint( @@ -106,14 +103,13 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): "neon.file_cache_size_limit=1GB", "neon.file_cache_prewarm_limit=1000", ] - offload_secs = 2 if method == PrewarmMethod.AUTOPREWARM: endpoint = env.endpoints.create_start( branch_name="main", config_lines=cfg, autoprewarm=True, - offload_lfc_interval_seconds=offload_secs, + offload_lfc_interval_seconds=AUTOOFFLOAD_INTERVAL_SECS, ) else: endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg) @@ -135,7 +131,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): endpoint.stop() if method == PrewarmMethod.AUTOPREWARM: - endpoint.start(autoprewarm=True, offload_lfc_interval_seconds=offload_secs) + endpoint.start(autoprewarm=True, offload_lfc_interval_seconds=AUTOOFFLOAD_INTERVAL_SECS) else: endpoint.start() @@ -162,7 +158,6 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): lfc_cur.execute("select sum(pk) from t") assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 - check_pinned_entries(pg_cur) desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped} check_prewarmed(method, client, desired) @@ -243,9 +238,9 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet prewarm_thread.start() def prewarmed(): - assert n_prewarms > 5 + assert n_prewarms > 3 - wait_until(prewarmed) + wait_until(prewarmed, timeout=40) # debug builds don't finish in 20s running = False for t in workload_threads: @@ -256,7 +251,6 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet total_balance = lfc_cur.fetchall()[0][0] assert total_balance == 0 - check_pinned_entries(pg_cur) if method == PrewarmMethod.POSTGRES: return desired = { From ffeede085e3008616872372ac98edcef573c8677 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 10 Jul 2025 12:58:22 +0100 Subject: [PATCH 13/15] libs: move metric collection for pageserver and safekeeper in a background task (#12525) ## Problem Safekeeper and pageserver metrics collection might time out. We've seen this in both hadron and neon. ## Summary of changes This PR moves metrics collection in PS/SK to the background so that we will always get some metrics, despite there may be some delays. Will leave it to the future work to reduce metrics collection time. --------- Co-authored-by: Chen Luo --- libs/http-utils/src/endpoint.rs | 37 ++++++++- libs/pageserver_api/src/config.rs | 2 + libs/utils/src/lib.rs | 2 + libs/utils/src/metrics_collector.rs | 75 +++++++++++++++++++ pageserver/src/bin/pageserver.rs | 41 +++++++++- pageserver/src/config.rs | 6 ++ pageserver/src/http/routes.rs | 7 +- pageserver/src/lib.rs | 12 +++ safekeeper/src/bin/safekeeper.rs | 27 +++++++ safekeeper/src/http/routes.rs | 9 ++- safekeeper/src/lib.rs | 2 + safekeeper/src/wal_backup.rs | 2 +- .../tests/walproposer_sim/safekeeper.rs | 1 + test_runner/fixtures/pageserver/http.py | 2 +- test_runner/fixtures/safekeeper/http.py | 2 +- 15 files changed, 217 insertions(+), 10 deletions(-) create mode 100644 libs/utils/src/metrics_collector.rs diff --git a/libs/http-utils/src/endpoint.rs b/libs/http-utils/src/endpoint.rs index f32ced1180..a61bf8e08a 100644 --- a/libs/http-utils/src/endpoint.rs +++ b/libs/http-utils/src/endpoint.rs @@ -20,6 +20,7 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::io::ReaderStream; use tracing::{Instrument, debug, info, info_span, warn}; use utils::auth::{AuthError, Claims, SwappableJwtAuth}; +use utils::metrics_collector::{METRICS_COLLECTOR, METRICS_STALE_MILLIS}; use crate::error::{ApiError, api_error_handler, route_error_handler}; use crate::request::{get_query_param, parse_query_param}; @@ -250,9 +251,28 @@ impl std::io::Write for ChannelWriter { } } -pub async fn prometheus_metrics_handler(_req: Request) -> Result, ApiError> { +pub async fn prometheus_metrics_handler( + req: Request, + force_metric_collection_on_scrape: bool, +) -> Result, ApiError> { SERVE_METRICS_COUNT.inc(); + // HADRON + let requested_use_latest = parse_query_param(&req, "use_latest")?; + + let use_latest = match requested_use_latest { + None => force_metric_collection_on_scrape, + Some(true) => true, + Some(false) => { + if force_metric_collection_on_scrape { + // We don't cache in this case + true + } else { + false + } + } + }; + let started_at = std::time::Instant::now(); let (tx, rx) = mpsc::channel(1); @@ -277,12 +297,18 @@ pub async fn prometheus_metrics_handler(_req: Request) -> Result) -> Result { tracing::info!( @@ -303,6 +333,7 @@ pub async fn prometheus_metrics_handler(_req: Request) -> Result, #[serde(skip_serializing_if = "Option::is_none")] pub image_layer_generation_large_timeline_threshold: Option, + pub force_metric_collection_on_scrape: bool, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -831,6 +832,7 @@ impl Default for ConfigToml { basebackup_cache_config: None, posthog_config: None, image_layer_generation_large_timeline_threshold: Some(2 * 1024 * 1024 * 1024), + force_metric_collection_on_scrape: true, } } } diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 11f787562c..2b81da017d 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -99,6 +99,8 @@ pub mod elapsed_accum; #[cfg(target_os = "linux")] pub mod linux_socket_ioctl; +pub mod metrics_collector; + // Re-export used in macro. Avoids adding git-version as dep in target crates. #[doc(hidden)] pub use git_version; diff --git a/libs/utils/src/metrics_collector.rs b/libs/utils/src/metrics_collector.rs new file mode 100644 index 0000000000..9e57fcd643 --- /dev/null +++ b/libs/utils/src/metrics_collector.rs @@ -0,0 +1,75 @@ +use std::{ + sync::{Arc, RwLock}, + time::{Duration, Instant}, +}; + +use metrics::{IntGauge, proto::MetricFamily, register_int_gauge}; +use once_cell::sync::Lazy; + +pub static METRICS_STALE_MILLIS: Lazy = Lazy::new(|| { + register_int_gauge!( + "metrics_metrics_stale_milliseconds", + "The current metrics stale time in milliseconds" + ) + .expect("failed to define a metric") +}); + +#[derive(Debug)] +pub struct CollectedMetrics { + pub metrics: Vec, + pub collected_at: Instant, +} + +impl CollectedMetrics { + fn new(metrics: Vec) -> Self { + Self { + metrics, + collected_at: Instant::now(), + } + } +} + +#[derive(Debug)] +pub struct MetricsCollector { + last_collected: RwLock>, +} + +impl MetricsCollector { + pub fn new() -> Self { + Self { + last_collected: RwLock::new(Arc::new(CollectedMetrics::new(vec![]))), + } + } + + #[tracing::instrument(name = "metrics_collector", skip_all)] + pub fn run_once(&self, cache_metrics: bool) -> Arc { + let started = Instant::now(); + let metrics = metrics::gather(); + let collected = Arc::new(CollectedMetrics::new(metrics)); + if cache_metrics { + let mut guard = self.last_collected.write().unwrap(); + *guard = collected.clone(); + } + tracing::info!( + "Collected {} metric families in {} ms", + collected.metrics.len(), + started.elapsed().as_millis() + ); + collected + } + + pub fn last_collected(&self) -> Arc { + self.last_collected.read().unwrap().clone() + } +} + +impl Default for MetricsCollector { + fn default() -> Self { + Self::new() + } +} + +// Interval for metrics collection. Currently hard-coded to be the same as the metrics scape interval from the obs agent +pub static METRICS_COLLECTION_INTERVAL: Duration = Duration::from_secs(30); + +pub static METRICS_COLLECTOR: Lazy = Lazy::new(MetricsCollector::default); diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 78aba25d2e..299fe7e159 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -29,8 +29,8 @@ use pageserver::task_mgr::{ }; use pageserver::tenant::{TenantSharedResources, mgr, secondary}; use pageserver::{ - CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener, http, - page_cache, page_service, task_mgr, virtual_file, + CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener, + MetricsCollectionTask, http, page_cache, page_service, task_mgr, virtual_file, }; use postgres_backend::AuthType; use remote_storage::GenericRemoteStorage; @@ -41,6 +41,7 @@ use tracing_utils::OtelGuard; use utils::auth::{JwtAuth, SwappableJwtAuth}; use utils::crashsafe::syncfs; use utils::logging::TracingErrorLayerEnablement; +use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR}; use utils::sentry_init::init_sentry; use utils::{failpoint_support, logging, project_build_tag, project_git_version, tcp_listener}; @@ -763,6 +764,41 @@ fn start_pageserver( (http_task, https_task) }; + /* BEGIN_HADRON */ + let metrics_collection_task = { + let cancel = shutdown_pageserver.child_token(); + let task = crate::BACKGROUND_RUNTIME.spawn({ + let cancel = cancel.clone(); + let background_jobs_barrier = background_jobs_barrier.clone(); + async move { + if conf.force_metric_collection_on_scrape { + return; + } + + // first wait until background jobs are cleared to launch. + tokio::select! { + _ = cancel.cancelled() => { return; }, + _ = background_jobs_barrier.wait() => {} + }; + let mut interval = tokio::time::interval(METRICS_COLLECTION_INTERVAL); + loop { + tokio::select! { + _ = cancel.cancelled() => { + tracing::info!("cancelled metrics collection task, exiting..."); + break; + }, + _ = interval.tick() => {} + } + tokio::task::spawn_blocking(|| { + METRICS_COLLECTOR.run_once(true); + }); + } + } + }); + MetricsCollectionTask(CancellableTask { task, cancel }) + }; + /* END_HADRON */ + let consumption_metrics_tasks = { let cancel = shutdown_pageserver.child_token(); let task = crate::BACKGROUND_RUNTIME.spawn({ @@ -844,6 +880,7 @@ fn start_pageserver( https_endpoint_listener, page_service, page_service_grpc, + metrics_collection_task, consumption_metrics_tasks, disk_usage_eviction_task, &tenant_manager, diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f64c5838ff..bb73ae1dd5 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -256,6 +256,10 @@ pub struct PageServerConf { /// Defines what is a big tenant for the purpose of image layer generation. /// See Timeline::should_check_if_image_layers_required pub image_layer_generation_large_timeline_threshold: Option, + + /// Controls whether to collect all metrics on each scrape or to return potentially stale + /// results. + pub force_metric_collection_on_scrape: bool, } /// Token for authentication to safekeepers @@ -437,6 +441,7 @@ impl PageServerConf { timeline_import_config, basebackup_cache_config, image_layer_generation_large_timeline_threshold, + force_metric_collection_on_scrape, } = config_toml; let mut conf = PageServerConf { @@ -496,6 +501,7 @@ impl PageServerConf { timeline_import_config, basebackup_cache_config, image_layer_generation_large_timeline_threshold, + force_metric_collection_on_scrape, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 767bba49e2..ed0a5440cb 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3938,9 +3938,14 @@ pub fn make_router( .expect("construct launch timestamp header middleware"), ); + let force_metric_collection_on_scrape = state.conf.force_metric_collection_on_scrape; + + let prometheus_metrics_handler_wrapper = + move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape); + Ok(router .data(state) - .get("/metrics", |r| request_span(r, prometheus_metrics_handler)) + .get("/metrics", move |r| request_span(r, prometheus_metrics_handler_wrapper)) .get("/profile/cpu", |r| request_span(r, profile_cpu_handler)) .get("/profile/heap", |r| request_span(r, profile_heap_handler)) .get("/v1/status", |r| api_handler(r, status_handler)) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 0dd3c465e0..0864026f6b 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -73,6 +73,9 @@ pub struct HttpEndpointListener(pub CancellableTask); pub struct HttpsEndpointListener(pub CancellableTask); pub struct ConsumptionMetricsTasks(pub CancellableTask); pub struct DiskUsageEvictionTask(pub CancellableTask); +// HADRON +pub struct MetricsCollectionTask(pub CancellableTask); + impl CancellableTask { pub async fn shutdown(self) { self.cancel.cancel(); @@ -87,6 +90,7 @@ pub async fn shutdown_pageserver( https_listener: Option, page_service: page_service::Listener, grpc_task: Option, + metrics_collection_task: MetricsCollectionTask, consumption_metrics_worker: ConsumptionMetricsTasks, disk_usage_eviction_task: Option, tenant_manager: &TenantManager, @@ -211,6 +215,14 @@ pub async fn shutdown_pageserver( // Best effort to persist any outstanding deletions, to avoid leaking objects deletion_queue.shutdown(Duration::from_secs(5)).await; + // HADRON + timed( + metrics_collection_task.0.shutdown(), + "shutdown metrics collections metrics", + Duration::from_secs(1), + ) + .await; + timed( consumption_metrics_worker.0.shutdown(), "shutdown consumption metrics", diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 8fda625817..b2d5976ef4 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -37,6 +37,7 @@ use tracing::*; use utils::auth::{JwtAuth, Scope, SwappableJwtAuth}; use utils::id::NodeId; use utils::logging::{self, LogFormat, SecretString}; +use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR}; use utils::sentry_init::init_sentry; use utils::{pid_file, project_build_tag, project_git_version, tcp_listener}; @@ -243,6 +244,11 @@ struct Args { #[arg(long)] enable_tls_wal_service_api: bool, + /// Controls whether to collect all metrics on each scrape or to return potentially stale + /// results. + #[arg(long, default_value_t = true)] + force_metric_collection_on_scrape: bool, + /// Run in development mode (disables security checks) #[arg(long, help = "Run in development mode (disables security checks)")] dev: bool, @@ -428,6 +434,7 @@ async fn main() -> anyhow::Result<()> { ssl_ca_certs, use_https_safekeeper_api: args.use_https_safekeeper_api, enable_tls_wal_service_api: args.enable_tls_wal_service_api, + force_metric_collection_on_scrape: args.force_metric_collection_on_scrape, }); // initialize sentry if SENTRY_DSN is provided @@ -640,6 +647,26 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { .map(|res| ("broker main".to_owned(), res)); tasks_handles.push(Box::pin(broker_task_handle)); + /* BEGIN_HADRON */ + if conf.force_metric_collection_on_scrape { + let metrics_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| BACKGROUND_RUNTIME.handle()) + .spawn(async move { + let mut interval: tokio::time::Interval = + tokio::time::interval(METRICS_COLLECTION_INTERVAL); + loop { + interval.tick().await; + tokio::task::spawn_blocking(|| { + METRICS_COLLECTOR.run_once(true); + }); + } + }) + .map(|res| ("broker main".to_owned(), res)); + tasks_handles.push(Box::pin(metrics_handle)); + } + /* END_HADRON */ + set_build_info_metric(GIT_VERSION, BUILD_TAG); // TODO: update tokio-stream, convert to real async Stream with diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 384c582678..4b061c65d9 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -699,6 +699,11 @@ pub fn make_router( })) } + let force_metric_collection_on_scrape = conf.force_metric_collection_on_scrape; + + let prometheus_metrics_handler_wrapper = + move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape); + // NB: on any changes do not forget to update the OpenAPI spec // located nearby (/safekeeper/src/http/openapi_spec.yaml). let auth = conf.http_auth.clone(); @@ -706,7 +711,9 @@ pub fn make_router( .data(conf) .data(global_timelines) .data(auth) - .get("/metrics", |r| request_span(r, prometheus_metrics_handler)) + .get("/metrics", move |r| { + request_span(r, prometheus_metrics_handler_wrapper) + }) .get("/profile/cpu", |r| request_span(r, profile_cpu_handler)) .get("/profile/heap", |r| request_span(r, profile_heap_handler)) .get("/v1/status", |r| request_span(r, status_handler)) diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index c461c071da..c0b5403ebf 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -134,6 +134,7 @@ pub struct SafeKeeperConf { pub ssl_ca_certs: Vec, pub use_https_safekeeper_api: bool, pub enable_tls_wal_service_api: bool, + pub force_metric_collection_on_scrape: bool, } impl SafeKeeperConf { @@ -183,6 +184,7 @@ impl SafeKeeperConf { ssl_ca_certs: Vec::new(), use_https_safekeeper_api: false, enable_tls_wal_service_api: false, + force_metric_collection_on_scrape: true, } } } diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 7e10847a1b..0e8dfd64c3 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -166,7 +166,7 @@ fn hadron_determine_offloader(mgr: &Manager, state: &StateSnapshot) -> (Option) -> Result<()> { ssl_ca_certs: Vec::new(), use_https_safekeeper_api: false, enable_tls_wal_service_api: false, + force_metric_collection_on_scrape: true, }; let mut global = GlobalMap::new(disk, conf.clone())?; diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 79cfba8da6..8e7d957b22 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -1002,7 +1002,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): def get_metrics_str(self) -> str: """You probably want to use get_metrics() instead.""" - res = self.get(f"http://localhost:{self.port}/metrics") + res = self.get(f"http://localhost:{self.port}/metrics?use_latest=true") self.verbose_error(res) return res.text diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index 942b620be6..ceb00c0f90 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -143,7 +143,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): def get_metrics_str(self) -> str: """You probably want to use get_metrics() instead.""" - request_result = self.get(f"http://localhost:{self.port}/metrics") + request_result = self.get(f"http://localhost:{self.port}/metrics?use_latest=true") request_result.raise_for_status() return request_result.text From d33b3c7457e1bbe15f1961ebec749249c6f77f5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 10 Jul 2025 16:03:20 +0200 Subject: [PATCH 14/15] Print viability via custom printing impl (#12544) As per https://github.com/neondatabase/neon/pull/12485#issuecomment-3056525882 , we don't want to print the viability error via a debug impl as it prints the backtrace. SafekeeperInfo doesn't have a display impl, so fall back to `Debug` for the `Ok` case. It gives single line output so it's okay to use `Debug` for it. Follow up of https://github.com/neondatabase/neon/pull/12485 --- storage_controller/src/service.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index d2f7287be9..3844570b47 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1984,11 +1984,14 @@ impl Service { }); // Check that there is enough safekeepers configured that we can create new timelines - let test_sk_res = this.safekeepers_for_new_timeline().await; + let test_sk_res_str = match this.safekeepers_for_new_timeline().await { + Ok(v) => format!("Ok({v:?})"), + Err(v) => format!("Err({v:})"), + }; tracing::info!( timeline_safekeeper_count = config.timeline_safekeeper_count, timelines_onto_safekeepers = config.timelines_onto_safekeepers, - "viability test result (test timeline creation on safekeepers): {test_sk_res:?}", + "viability test result (test timeline creation on safekeepers): {test_sk_res_str}", ); Ok(this) From be5bbaecadda71478638608c469c184aaf124bf5 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Thu, 10 Jul 2025 10:28:58 -0400 Subject: [PATCH 15/15] fix(storcon): correctly handle 404 error in lsn lease (#12537) ## Problem close LKB-253 ## Summary of changes 404 for timeline requests could happen when the tenant is intended to be on a pageserver but not attached yet. This patch adds handling for the lease request. In the future, we should extend this handling to more operations. --------- Signed-off-by: Alex Chi Z --- storage_controller/src/service.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 3844570b47..9c1b81d261 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -4761,6 +4761,7 @@ impl Service { ) .await; + let mut retry_if_not_attached = false; let targets = { let locked = self.inner.read().unwrap(); let mut targets = Vec::new(); @@ -4777,6 +4778,24 @@ impl Service { .expect("Pageservers may not be deleted while referenced"); targets.push((*tenant_shard_id, node.clone())); + + if let Some(location) = shard.observed.locations.get(node_id) { + if let Some(ref conf) = location.conf { + if conf.mode != LocationConfigMode::AttachedSingle + && conf.mode != LocationConfigMode::AttachedMulti + { + // If the shard is attached as secondary, we need to retry if 404. + retry_if_not_attached = true; + } + // If the shard is attached as primary, we should succeed. + } else { + // Location conf is not available yet, retry if 404. + retry_if_not_attached = true; + } + } else { + // The shard is not attached to the intended pageserver yet, retry if 404. + retry_if_not_attached = true; + } } } targets @@ -4807,6 +4826,18 @@ impl Service { valid_until = Some(lease.valid_until); } } + Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) + if retry_if_not_attached => + { + // This is expected if the attach is not finished yet. Return 503 so that the client can retry. + return Err(ApiError::ResourceUnavailable( + format!( + "Timeline is not attached to the pageserver {} yet, please retry", + node.get_id() + ) + .into(), + )); + } Err(e) => { return Err(passthrough_api_error(&node, e)); }