diff --git a/Cargo.lock b/Cargo.lock index 8a3e3a61eb..d66b383ed1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4506,6 +4506,7 @@ dependencies = [ "pageserver_page_api", "tokio", "tokio-stream", + "tokio-util", "tonic 0.13.1", "tracing", "utils", diff --git a/compute_tools/src/metrics.rs b/compute_tools/src/metrics.rs index 91dedbb42a..6e4df73c0f 100644 --- a/compute_tools/src/metrics.rs +++ b/compute_tools/src/metrics.rs @@ -108,7 +108,7 @@ pub(crate) static LFC_PREWARMS: Lazy = Lazy::new(|| { pub(crate) static LFC_PREWARM_ERRORS: Lazy = Lazy::new(|| { register_int_counter!( "compute_ctl_lfc_prewarm_errors_total", - "Total number of LFC prewarms errors requested by compute_ctl or autoprewarm option", + "Total number of LFC prewarm errors", ) .expect("failed to define a metric") }); @@ -124,7 +124,7 @@ pub(crate) static LFC_OFFLOADS: Lazy = Lazy::new(|| { pub(crate) static LFC_OFFLOAD_ERRORS: Lazy = Lazy::new(|| { register_int_counter!( "compute_ctl_lfc_offload_errors_total", - "Total number of LFC offload errors requested by compute_ctl or lfc_offload_period_seconds option", + "Total number of LFC offload errors", ) .expect("failed to define a metric") }); diff --git a/compute_tools/src/migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql b/compute_tools/src/migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql new file mode 100644 index 0000000000..36e31544be --- /dev/null +++ b/compute_tools/src/migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql @@ -0,0 +1 @@ +GRANT pg_signal_backend TO neon_superuser WITH ADMIN OPTION; diff --git a/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql index acb8dd417d..3464a2b1cf 100644 --- a/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql +++ b/compute_tools/src/migrations/tests/0004-grant_pg_monitor_to_neon_superuser.sql @@ -7,13 +7,17 @@ BEGIN INTO monitor FROM pg_auth_members WHERE roleid = 'pg_monitor'::regrole - AND member = 'pg_monitor'::regrole; + AND member = 'neon_superuser'::regrole; - IF NOT monitor.member THEN + IF monitor IS NULL THEN + RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_monitor'; + END IF; + + IF monitor.admin IS NULL OR NOT monitor.member THEN RAISE EXCEPTION 'neon_superuser is not a member of pg_monitor'; END IF; - IF NOT monitor.admin THEN + IF monitor.admin IS NULL OR NOT monitor.admin THEN RAISE EXCEPTION 'neon_superuser cannot grant pg_monitor'; END IF; END $$; diff --git a/compute_tools/src/migrations/tests/0012-grant_pg_signal_backend_to_neon_superuser.sql b/compute_tools/src/migrations/tests/0012-grant_pg_signal_backend_to_neon_superuser.sql new file mode 100644 index 0000000000..e62b742d30 --- /dev/null +++ b/compute_tools/src/migrations/tests/0012-grant_pg_signal_backend_to_neon_superuser.sql @@ -0,0 +1,23 @@ +DO $$ +DECLARE + signal_backend record; +BEGIN + SELECT pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member, + admin_option AS admin + INTO signal_backend + FROM pg_auth_members + WHERE roleid = 'pg_signal_backend'::regrole + AND member = 'neon_superuser'::regrole; + + IF signal_backend IS NULL THEN + RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_signal_backend'; + END IF; + + IF signal_backend.member IS NULL OR NOT signal_backend.member THEN + RAISE EXCEPTION 'neon_superuser is not a member of pg_signal_backend'; + END IF; + + IF signal_backend.admin IS NULL OR NOT signal_backend.admin THEN + RAISE EXCEPTION 'neon_superuser cannot grant pg_signal_backend'; + END IF; +END $$; diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 43cfbb48f7..b6382b2f56 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -197,6 +197,7 @@ pub async fn handle_migrations(client: &mut Client) -> Result<()> { include_str!( "./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql" ), + include_str!("./migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql"), ]; MigrationRunner::new(client, &migrations) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index ba13671c93..970ebe1824 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -452,6 +452,12 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'image_creation_threshold' as non zero integer")?, + // HADRON + image_layer_force_creation_period: settings + .remove("image_layer_force_creation_period") + .map(humantime::parse_duration) + .transpose() + .context("Failed to parse 'image_layer_force_creation_period' as duration")?, image_layer_creation_check_threshold: settings .remove("image_layer_creation_check_threshold") .map(|x| x.parse::()) diff --git a/libs/http-utils/src/endpoint.rs b/libs/http-utils/src/endpoint.rs index f32ced1180..a61bf8e08a 100644 --- a/libs/http-utils/src/endpoint.rs +++ b/libs/http-utils/src/endpoint.rs @@ -20,6 +20,7 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::io::ReaderStream; use tracing::{Instrument, debug, info, info_span, warn}; use utils::auth::{AuthError, Claims, SwappableJwtAuth}; +use utils::metrics_collector::{METRICS_COLLECTOR, METRICS_STALE_MILLIS}; use crate::error::{ApiError, api_error_handler, route_error_handler}; use crate::request::{get_query_param, parse_query_param}; @@ -250,9 +251,28 @@ impl std::io::Write for ChannelWriter { } } -pub async fn prometheus_metrics_handler(_req: Request) -> Result, ApiError> { +pub async fn prometheus_metrics_handler( + req: Request, + force_metric_collection_on_scrape: bool, +) -> Result, ApiError> { SERVE_METRICS_COUNT.inc(); + // HADRON + let requested_use_latest = parse_query_param(&req, "use_latest")?; + + let use_latest = match requested_use_latest { + None => force_metric_collection_on_scrape, + Some(true) => true, + Some(false) => { + if force_metric_collection_on_scrape { + // We don't cache in this case + true + } else { + false + } + } + }; + let started_at = std::time::Instant::now(); let (tx, rx) = mpsc::channel(1); @@ -277,12 +297,18 @@ pub async fn prometheus_metrics_handler(_req: Request) -> Result) -> Result { tracing::info!( @@ -303,6 +333,7 @@ pub async fn prometheus_metrics_handler(_req: Request) -> Result, + #[serde(skip_serializing_if = "Option::is_none")] + pub image_layer_generation_large_timeline_threshold: Option, + pub force_metric_collection_on_scrape: bool, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -560,6 +564,11 @@ pub struct TenantConfigToml { pub gc_period: Duration, // Delta layer churn threshold to create L1 image layers. pub image_creation_threshold: usize, + // HADRON + // When the timeout is reached, PageServer will (1) force compact any remaining L0 deltas and + // (2) create image layers if there are any L1 deltas. + #[serde(with = "humantime_serde")] + pub image_layer_force_creation_period: Option, // Determines how much history is retained, to allow // branching and read replicas at an older point in time. // The unit is time. @@ -762,6 +771,7 @@ impl Default for ConfigToml { disk_usage_based_eviction: DiskUsageEvictionTaskConfig::default(), test_remote_failures: (0), + test_remote_failures_probability: (100), ondemand_download_behavior_treat_error_as_warn: (false), @@ -825,6 +835,8 @@ impl Default for ConfigToml { }, basebackup_cache_config: None, posthog_config: None, + image_layer_generation_large_timeline_threshold: Some(2 * 1024 * 1024 * 1024), + force_metric_collection_on_scrape: true, } } } @@ -918,6 +930,7 @@ impl Default for TenantConfigToml { gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD) .expect("cannot parse default gc period"), image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD, + image_layer_force_creation_period: None, pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL) .expect("cannot parse default PITR interval"), walreceiver_connect_timeout: humantime::parse_duration( diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index a9b75060e7..c7df4c41c7 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -600,6 +600,9 @@ pub struct TenantConfigPatch { pub gc_period: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub image_creation_threshold: FieldPatch, + // HADRON + #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub image_layer_force_creation_period: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub pitr_interval: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] @@ -705,6 +708,11 @@ pub struct TenantConfig { #[serde(skip_serializing_if = "Option::is_none")] pub image_creation_threshold: Option, + // HADRON + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "humantime_serde")] + pub image_layer_force_creation_period: Option, + #[serde(skip_serializing_if = "Option::is_none")] #[serde(with = "humantime_serde")] pub pitr_interval: Option, @@ -807,6 +815,7 @@ impl TenantConfig { mut gc_horizon, mut gc_period, mut image_creation_threshold, + mut image_layer_force_creation_period, mut pitr_interval, mut walreceiver_connect_timeout, mut lagging_wal_timeout, @@ -871,6 +880,11 @@ impl TenantConfig { patch .image_creation_threshold .apply(&mut image_creation_threshold); + // HADRON + patch + .image_layer_force_creation_period + .map(|v| humantime::parse_duration(&v))? + .apply(&mut image_layer_force_creation_period); patch .pitr_interval .map(|v| humantime::parse_duration(&v))? @@ -956,6 +970,7 @@ impl TenantConfig { gc_horizon, gc_period, image_creation_threshold, + image_layer_force_creation_period, pitr_interval, walreceiver_connect_timeout, lagging_wal_timeout, @@ -1031,6 +1046,9 @@ impl TenantConfig { image_creation_threshold: self .image_creation_threshold .unwrap_or(global_conf.image_creation_threshold), + image_layer_force_creation_period: self + .image_layer_force_creation_period + .or(global_conf.image_layer_force_creation_period), pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval), walreceiver_connect_timeout: self .walreceiver_connect_timeout diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 69316fd493..0ae13552b8 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -43,6 +43,7 @@ itertools.workspace = true sync_wrapper = { workspace = true, features = ["futures"] } byteorder = "1.4" +rand = "0.8.5" [dev-dependencies] camino-tempfile.workspace = true diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index ed416b2811..5885c3e791 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -732,9 +732,15 @@ impl GenericRemoteStorage { }) } - pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self { - Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first))) + /* BEGIN_HADRON */ + pub fn unreliable_wrapper(s: Self, fail_first: u64, fail_probability: u64) -> Self { + Self::Unreliable(Arc::new(UnreliableWrapper::new( + s, + fail_first, + fail_probability, + ))) } + /* END_HADRON */ /// See [`RemoteStorage::upload`], which this method calls with `None` as metadata. pub async fn upload_storage_object( diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index f9856a5856..30d116f57c 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -1,6 +1,8 @@ //! This module provides a wrapper around a real RemoteStorage implementation that //! causes the first N attempts at each upload or download operatio to fail. For //! testing purposes. +use rand::Rng; +use std::cmp; use std::collections::HashMap; use std::collections::hash_map::Entry; use std::num::NonZeroU32; @@ -25,6 +27,12 @@ pub struct UnreliableWrapper { // Tracks how many failed attempts of each operation has been made. attempts: Mutex>, + + /* BEGIN_HADRON */ + // This the probability of failure for each operation, ranged from [0, 100]. + // The probability is default to 100, which means that all operations will fail. + attempt_failure_probability: u64, + /* END_HADRON */ } /// Used to identify retries of different unique operation. @@ -40,7 +48,11 @@ enum RemoteOp { } impl UnreliableWrapper { - pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self { + pub fn new( + inner: crate::GenericRemoteStorage, + attempts_to_fail: u64, + attempt_failure_probability: u64, + ) -> Self { assert!(attempts_to_fail > 0); let inner = match inner { GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s), @@ -51,9 +63,11 @@ impl UnreliableWrapper { panic!("Can't wrap unreliable wrapper unreliably") } }; + let actual_attempt_failure_probability = cmp::min(attempt_failure_probability, 100); UnreliableWrapper { inner, attempts_to_fail, + attempt_failure_probability: actual_attempt_failure_probability, attempts: Mutex::new(HashMap::new()), } } @@ -66,6 +80,7 @@ impl UnreliableWrapper { /// fn attempt(&self, op: RemoteOp) -> anyhow::Result { let mut attempts = self.attempts.lock().unwrap(); + let mut rng = rand::thread_rng(); match attempts.entry(op) { Entry::Occupied(mut e) => { @@ -75,15 +90,19 @@ impl UnreliableWrapper { *p }; - if attempts_before_this >= self.attempts_to_fail { - // let it succeed - e.remove(); - Ok(attempts_before_this) - } else { + /* BEGIN_HADRON */ + // If there are more attempts to fail, fail the request by probability. + if (attempts_before_this < self.attempts_to_fail) + && (rng.gen_range(0..=100) < self.attempt_failure_probability) + { let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key()); Err(error) + } else { + e.remove(); + Ok(attempts_before_this) } + /* END_HADRON */ } Entry::Vacant(e) => { let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key()); diff --git a/libs/utils/src/env.rs b/libs/utils/src/env.rs index 2a85f54a01..cc1cbf8009 100644 --- a/libs/utils/src/env.rs +++ b/libs/utils/src/env.rs @@ -44,3 +44,62 @@ where } } } + +/* BEGIN_HADRON */ +pub enum DeploymentMode { + Dev, + Staging, + Prod, +} + +pub fn get_deployment_mode() -> Option { + match std::env::var("DEPLOYMENT_MODE") { + Ok(env) => match env.as_str() { + "development" => Some(DeploymentMode::Dev), + "staging" => Some(DeploymentMode::Staging), + "production" => Some(DeploymentMode::Prod), + _ => { + tracing::error!("Unexpected DEPLOYMENT_MODE: {}", env); + None + } + }, + Err(_) => { + tracing::error!("DEPLOYMENT_MODE not set"); + None + } + } +} + +pub fn is_dev_or_staging() -> bool { + matches!( + get_deployment_mode(), + Some(DeploymentMode::Dev) | Some(DeploymentMode::Staging) + ) +} + +pub enum TestingMode { + Chaos, + Stress, +} + +pub fn get_test_mode() -> Option { + match std::env::var("HADRON_TEST_MODE") { + Ok(env) => match env.as_str() { + "chaos" => Some(TestingMode::Chaos), + "stress" => Some(TestingMode::Stress), + _ => { + tracing::error!("Unexpected HADRON_TEST_MODE: {}", env); + None + } + }, + Err(_) => { + tracing::error!("HADRON_TEST_MODE not set"); + None + } + } +} + +pub fn is_chaos_testing() -> bool { + matches!(get_test_mode(), Some(TestingMode::Chaos)) +} +/* END_HADRON */ diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 11f787562c..2b81da017d 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -99,6 +99,8 @@ pub mod elapsed_accum; #[cfg(target_os = "linux")] pub mod linux_socket_ioctl; +pub mod metrics_collector; + // Re-export used in macro. Avoids adding git-version as dep in target crates. #[doc(hidden)] pub use git_version; diff --git a/libs/utils/src/metrics_collector.rs b/libs/utils/src/metrics_collector.rs new file mode 100644 index 0000000000..9e57fcd643 --- /dev/null +++ b/libs/utils/src/metrics_collector.rs @@ -0,0 +1,75 @@ +use std::{ + sync::{Arc, RwLock}, + time::{Duration, Instant}, +}; + +use metrics::{IntGauge, proto::MetricFamily, register_int_gauge}; +use once_cell::sync::Lazy; + +pub static METRICS_STALE_MILLIS: Lazy = Lazy::new(|| { + register_int_gauge!( + "metrics_metrics_stale_milliseconds", + "The current metrics stale time in milliseconds" + ) + .expect("failed to define a metric") +}); + +#[derive(Debug)] +pub struct CollectedMetrics { + pub metrics: Vec, + pub collected_at: Instant, +} + +impl CollectedMetrics { + fn new(metrics: Vec) -> Self { + Self { + metrics, + collected_at: Instant::now(), + } + } +} + +#[derive(Debug)] +pub struct MetricsCollector { + last_collected: RwLock>, +} + +impl MetricsCollector { + pub fn new() -> Self { + Self { + last_collected: RwLock::new(Arc::new(CollectedMetrics::new(vec![]))), + } + } + + #[tracing::instrument(name = "metrics_collector", skip_all)] + pub fn run_once(&self, cache_metrics: bool) -> Arc { + let started = Instant::now(); + let metrics = metrics::gather(); + let collected = Arc::new(CollectedMetrics::new(metrics)); + if cache_metrics { + let mut guard = self.last_collected.write().unwrap(); + *guard = collected.clone(); + } + tracing::info!( + "Collected {} metric families in {} ms", + collected.metrics.len(), + started.elapsed().as_millis() + ); + collected + } + + pub fn last_collected(&self) -> Arc { + self.last_collected.read().unwrap().clone() + } +} + +impl Default for MetricsCollector { + fn default() -> Self { + Self::new() + } +} + +// Interval for metrics collection. Currently hard-coded to be the same as the metrics scape interval from the obs agent +pub static METRICS_COLLECTION_INTERVAL: Duration = Duration::from_secs(30); + +pub static METRICS_COLLECTOR: Lazy = Lazy::new(MetricsCollector::default); diff --git a/pageserver/client_grpc/Cargo.toml b/pageserver/client_grpc/Cargo.toml index 84e27abb84..ca224900ac 100644 --- a/pageserver/client_grpc/Cargo.toml +++ b/pageserver/client_grpc/Cargo.toml @@ -4,6 +4,9 @@ version = "0.1.0" edition.workspace = true license.workspace = true +[features] +testing = ["pageserver_api/testing"] + [dependencies] anyhow.workspace = true bytes.workspace = true @@ -13,6 +16,7 @@ pageserver_api.workspace = true pageserver_page_api.workspace = true tokio.workspace = true tokio-stream.workspace = true +tokio-util.workspace = true tonic.workspace = true tracing.workspace = true utils.workspace = true diff --git a/pageserver/client_grpc/src/client.rs b/pageserver/client_grpc/src/client.rs index c21ce2e47d..63852868c3 100644 --- a/pageserver/client_grpc/src/client.rs +++ b/pageserver/client_grpc/src/client.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::num::NonZero; use std::sync::Arc; use anyhow::anyhow; @@ -15,6 +16,32 @@ use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::{ShardCount, ShardIndex, ShardNumber}; +/// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up +/// when full. +/// +/// TODO: tune all of these constants, and consider making them configurable. +/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels +/// with only streams. +const MAX_CLIENTS_PER_CHANNEL: NonZero = NonZero::new(16).unwrap(); + +/// Max number of concurrent unary request clients per shard. +const MAX_UNARY_CLIENTS: NonZero = NonZero::new(64).unwrap(); + +/// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage +/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`. +const MAX_STREAMS: NonZero = NonZero::new(64).unwrap(); + +/// Max number of pipelined requests per stream. +const MAX_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(2).unwrap(); + +/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these +/// are more throughput-oriented, we have a smaller limit but higher queue depth. +const MAX_BULK_STREAMS: NonZero = NonZero::new(16).unwrap(); + +/// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus +/// get a larger queue depth. +const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero = NonZero::new(4).unwrap(); + /// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the /// basic `page_api::Client` gRPC client, and supports: /// @@ -87,6 +114,7 @@ impl PageserverClient { /// errors. All responses will have `GetPageStatusCode::Ok`. #[instrument(skip_all, fields( req_id = %req.request_id, + class = %req.request_class, rel = %req.rel, blkno = %req.block_numbers[0], blks = %req.block_numbers.len(), @@ -141,7 +169,11 @@ impl PageserverClient { let resp = self .retry .with(async || { - let stream = self.shards.get(shard_id)?.stream().await; + let stream = self + .shards + .get(shard_id)? + .stream(req.request_class.is_bulk()) + .await; let resp = stream.send(req.clone()).await?; // Convert per-request errors into a tonic::Status. @@ -270,17 +302,22 @@ impl Shards { } } -/// A single shard. +/// A single shard. Uses dedicated resource pools with the following structure: /// -/// TODO: consider separate pools for normal and bulk traffic, with different settings. +/// * Channel pool: unbounded. +/// * Unary client pool: MAX_UNARY_CLIENTS. +/// * Stream client pool: unbounded. +/// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH. +/// * Bulk channel pool: unbounded. +/// * Bulk client pool: unbounded. +/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH. struct Shard { - /// Dedicated channel pool for this shard. Shared by all clients/streams in this shard. - _channel_pool: Arc, - /// Unary gRPC client pool for this shard. Uses the shared channel pool. + /// Unary gRPC client pool. client_pool: Arc, - /// GetPage stream pool for this shard. Uses a dedicated client pool, but shares the channel - /// pool with unary clients. + /// GetPage stream pool. stream_pool: Arc, + /// GetPage stream pool for bulk requests, e.g. prefetches. + bulk_stream_pool: Arc, } impl Shard { @@ -297,34 +334,53 @@ impl Shard { return Err(anyhow!("invalid shard URL {url}: must use gRPC")); } - // Use a common channel pool for all clients, to multiplex unary and stream requests across - // the same TCP connections. The channel pool is unbounded (but client pools are bounded). - let channel_pool = ChannelPool::new(url)?; + // Common channel pool for unary and stream requests. Bounded by client/stream pools. + let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?; - // Dedicated client pool for unary requests. + // Client pool for unary requests. let client_pool = ClientPool::new( channel_pool.clone(), tenant_id, timeline_id, shard_id, auth_token.clone(), + Some(MAX_UNARY_CLIENTS), ); - // Stream pool with dedicated client pool. If this shared a client pool with unary requests, - // long-lived streams could fill up the client pool and starve out unary requests. It shares - // the same underlying channel pool with unary clients though, which is unbounded. - let stream_pool = StreamPool::new(ClientPool::new( - channel_pool.clone(), - tenant_id, - timeline_id, - shard_id, - auth_token, - )); + // GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients, + // but shares a channel pool with it (as it's unbounded). + let stream_pool = StreamPool::new( + ClientPool::new( + channel_pool.clone(), + tenant_id, + timeline_id, + shard_id, + auth_token.clone(), + None, // unbounded, limited by stream pool + ), + Some(MAX_STREAMS), + MAX_STREAM_QUEUE_DEPTH, + ); + + // Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools + // to avoid head-of-line blocking of latency-sensitive requests. + let bulk_stream_pool = StreamPool::new( + ClientPool::new( + ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?, + tenant_id, + timeline_id, + shard_id, + auth_token, + None, // unbounded, limited by stream pool + ), + Some(MAX_BULK_STREAMS), + MAX_BULK_STREAM_QUEUE_DEPTH, + ); Ok(Self { - _channel_pool: channel_pool, client_pool, stream_pool, + bulk_stream_pool, }) } @@ -336,8 +392,12 @@ impl Shard { .map_err(|err| tonic::Status::internal(format!("failed to get client: {err}"))) } - /// Returns a pooled stream for this shard. - async fn stream(&self) -> StreamGuard { - self.stream_pool.get().await + /// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream + /// pool (e.g. for prefetches). + async fn stream(&self, bulk: bool) -> StreamGuard { + match bulk { + false => self.stream_pool.get().await, + true => self.bulk_stream_pool.get().await, + } } } diff --git a/pageserver/client_grpc/src/pool.rs b/pageserver/client_grpc/src/pool.rs index 518e4e5b84..89b3bd646f 100644 --- a/pageserver/client_grpc/src/pool.rs +++ b/pageserver/client_grpc/src/pool.rs @@ -30,13 +30,16 @@ //! TODO: observability. use std::collections::{BTreeMap, HashMap}; +use std::num::NonZero; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; +use std::time::{Duration, Instant}; use futures::StreamExt as _; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; +use tokio_util::sync::CancellationToken; use tonic::transport::{Channel, Endpoint}; use tracing::{error, warn}; @@ -44,33 +47,43 @@ use pageserver_page_api as page_api; use utils::id::{TenantId, TimelineId}; use utils::shard::ShardIndex; -/// Max number of concurrent clients per channel. +/// Reap channels/clients/streams that have been idle for this long. /// -/// TODO: tune these constants, and make them configurable. -/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels -/// with only streams. -const CLIENTS_PER_CHANNEL: usize = 16; +/// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to +/// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle. +/// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we +/// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to +/// account for it when setting the reap threshold. Alternatively, we can immediately reap empty +/// channels, and/or stream pool clients. +const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) { + false => Duration::from_secs(180), + true => Duration::from_secs(1), // exercise reaping in tests +}; -/// Maximum number of concurrent clients per `ClientPool`. -const CLIENT_LIMIT: usize = 64; - -/// Max number of pipelined requests per gRPC GetPage stream. -const STREAM_QUEUE_DEPTH: usize = 2; +/// Reap idle resources with this interval. +const REAP_IDLE_INTERVAL: Duration = match cfg!(any(test, feature = "testing")) { + false => Duration::from_secs(10), + true => Duration::from_secs(1), // exercise reaping in tests +}; /// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2 -/// stream multiplexing), up to `CLIENTS_PER_CHANNEL`. The pool does not limit the number of -/// channels, and instead relies on `ClientPool` to limit the number of concurrent clients. +/// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this. +/// The pool does not limit the number of channels, and instead relies on `ClientPool` or +/// `StreamPool` to limit the number of concurrent clients. /// /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads. /// -/// TODO: reap idle channels. /// TODO: consider prewarming a set of channels, to avoid initial connection latency. /// TODO: consider adding a circuit breaker for errors and fail fast. pub struct ChannelPool { /// Pageserver endpoint to connect to. endpoint: Endpoint, + /// Max number of clients per channel. Beyond this, a new channel will be created. + max_clients_per_channel: NonZero, /// Open channels. channels: Mutex>, + /// Reaps idle channels. + idle_reaper: Reaper, /// Channel ID generator. next_channel_id: AtomicUsize, } @@ -82,20 +95,27 @@ struct ChannelEntry { channel: Channel, /// Number of clients using this channel. clients: usize, + /// The channel has been idle (no clients) since this time. None if channel is in use. + /// INVARIANT: Some if clients == 0, otherwise None. + idle_since: Option, } impl ChannelPool { /// Creates a new channel pool for the given Pageserver endpoint. - pub fn new(endpoint: E) -> anyhow::Result> + pub fn new(endpoint: E, max_clients_per_channel: NonZero) -> anyhow::Result> where E: TryInto + Send + Sync + 'static, >::Error: std::error::Error + Send + Sync, { - Ok(Arc::new(Self { + let pool = Arc::new(Self { endpoint: endpoint.try_into()?, + max_clients_per_channel, channels: Mutex::default(), + idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), next_channel_id: AtomicUsize::default(), - })) + }); + pool.idle_reaper.spawn(&pool); + Ok(pool) } /// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel. @@ -120,9 +140,18 @@ impl ChannelPool { // with lower-ordered channel IDs first. This will cluster clients in lower-ordered // channels, and free up higher-ordered channels such that they can be reaped. for (&id, entry) in channels.iter_mut() { - assert!(entry.clients <= CLIENTS_PER_CHANNEL, "channel overflow"); - if entry.clients < CLIENTS_PER_CHANNEL { + assert!( + entry.clients <= self.max_clients_per_channel.get(), + "channel overflow" + ); + assert_eq!( + entry.idle_since.is_some(), + entry.clients == 0, + "incorrect channel idle state" + ); + if entry.clients < self.max_clients_per_channel.get() { entry.clients += 1; + entry.idle_since = None; return ChannelGuard { pool: Arc::downgrade(self), id, @@ -139,6 +168,7 @@ impl ChannelPool { let entry = ChannelEntry { channel: channel.clone(), clients: 1, // account for the guard below + idle_since: None, }; channels.insert(id, entry); @@ -150,6 +180,20 @@ impl ChannelPool { } } +impl Reapable for ChannelPool { + /// Reaps channels that have been idle since before the cutoff. + fn reap_idle(&self, cutoff: Instant) { + self.channels.lock().unwrap().retain(|_, entry| { + let Some(idle_since) = entry.idle_since else { + assert_ne!(entry.clients, 0, "empty channel not marked idle"); + return true; + }; + assert_eq!(entry.clients, 0, "idle channel has clients"); + idle_since >= cutoff + }) + } +} + /// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`, /// since the gRPC client requires an owned `Channel`. pub struct ChannelGuard { @@ -172,20 +216,23 @@ impl Drop for ChannelGuard { let Some(pool) = self.pool.upgrade() else { return; // pool was dropped }; + let mut channels = pool.channels.lock().unwrap(); let entry = channels.get_mut(&self.id).expect("unknown channel"); + assert!(entry.idle_since.is_none(), "active channel marked idle"); assert!(entry.clients > 0, "channel underflow"); entry.clients -= 1; + if entry.clients == 0 { + entry.idle_since = Some(Instant::now()); // mark channel as idle + } } } /// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner /// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total -/// number of concurrent clients to `CLIENT_LIMIT` via semaphore. +/// number of concurrent clients to `max_clients` via semaphore. /// /// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads. -/// -/// TODO: reap idle clients. pub struct ClientPool { /// Tenant ID. tenant_id: TenantId, @@ -197,8 +244,8 @@ pub struct ClientPool { auth_token: Option, /// Channel pool to acquire channels from. channel_pool: Arc, - /// Limits the max number of concurrent clients for this pool. - limiter: Arc, + /// Limits the max number of concurrent clients for this pool. None if the pool is unbounded. + limiter: Option>, /// Idle pooled clients. Acquired clients are removed from here and returned on drop. /// /// The first client in the map will be acquired next. The map is sorted by client ID, which in @@ -206,6 +253,8 @@ pub struct ClientPool { /// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle /// clients are reaped. idle: Mutex>, + /// Reaps idle clients. + idle_reaper: Reaper, /// Unique client ID generator. next_client_id: AtomicUsize, } @@ -217,44 +266,51 @@ struct ClientEntry { client: page_api::Client, /// The channel guard for the channel used by the client. channel_guard: ChannelGuard, + /// The client has been idle since this time. All clients in `ClientPool::idle` are idle by + /// definition, so this is the time when it was added back to the pool. + idle_since: Instant, } impl ClientPool { /// Creates a new client pool for the given tenant shard. Channels are acquired from the given - /// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. + /// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. Allows up to + /// `max_clients` concurrent clients, or unbounded if None. pub fn new( channel_pool: Arc, tenant_id: TenantId, timeline_id: TimelineId, shard_id: ShardIndex, auth_token: Option, + max_clients: Option>, ) -> Arc { - Arc::new(Self { + let pool = Arc::new(Self { tenant_id, timeline_id, shard_id, auth_token, channel_pool, idle: Mutex::default(), - limiter: Arc::new(Semaphore::new(CLIENT_LIMIT)), + idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), + limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))), next_client_id: AtomicUsize::default(), - }) + }); + pool.idle_reaper.spawn(&pool); + pool } /// Gets a client from the pool, or creates a new one if necessary. Connections are established - /// lazily and do not block, but this call can block if the pool is at `CLIENT_LIMIT`. The - /// client is returned to the pool when the guard is dropped. + /// lazily and do not block, but this call can block if the pool is at `max_clients`. The client + /// is returned to the pool when the guard is dropped. /// /// This is moderately performance-sensitive. It is called for every unary request, but these /// establish a new gRPC stream per request so they're already expensive. GetPage requests use /// the `StreamPool` instead. pub async fn get(self: &Arc) -> anyhow::Result { - let permit = self - .limiter - .clone() - .acquire_owned() - .await - .expect("never closed"); + // Acquire a permit if the pool is bounded. + let mut permit = None; + if let Some(limiter) = self.limiter.clone() { + permit = Some(limiter.acquire_owned().await.expect("never closed")); + } // Fast path: acquire an idle client from the pool. if let Some((id, entry)) = self.idle.lock().unwrap().pop_first() { @@ -291,14 +347,24 @@ impl ClientPool { } } +impl Reapable for ClientPool { + /// Reaps clients that have been idle since before the cutoff. + fn reap_idle(&self, cutoff: Instant) { + self.idle + .lock() + .unwrap() + .retain(|_, entry| entry.idle_since >= cutoff) + } +} + /// A client acquired from the pool. The inner client can be accessed via Deref. The client is /// returned to the pool when dropped. pub struct ClientGuard { pool: Weak, id: ClientID, - client: Option, // Some until dropped - channel_guard: Option, // Some until dropped - permit: OwnedSemaphorePermit, + client: Option, // Some until dropped + channel_guard: Option, // Some until dropped + permit: Option, // None if pool is unbounded } impl Deref for ClientGuard { @@ -321,9 +387,11 @@ impl Drop for ClientGuard { let Some(pool) = self.pool.upgrade() else { return; // pool was dropped }; + let entry = ClientEntry { client: self.client.take().expect("dropped once"), channel_guard: self.channel_guard.take().expect("dropped once"), + idle_since: Instant::now(), }; pool.idle.lock().unwrap().insert(self.id, entry); @@ -338,19 +406,25 @@ impl Drop for ClientGuard { /// a single request and await the response. Internally, requests are multiplexed across streams and /// channels. This allows proper queue depth enforcement and response routing. /// -/// TODO: reap idle streams. /// TODO: consider making this generic over request and response types; not currently needed. pub struct StreamPool { - /// The client pool to acquire clients from. + /// The client pool to acquire clients from. Must be unbounded. client_pool: Arc, /// All pooled streams. /// /// Incoming requests will be sent over an existing stream with available capacity. If all - /// streams are full, a new one is spun up and added to the pool (up to the `ClientPool` limit). - /// Each stream has an associated Tokio task that processes requests and responses. - streams: Arc>>, - /// Limits the max number of concurrent requests (not streams). - limiter: Arc, + /// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each + /// stream has an associated Tokio task that processes requests and responses. + streams: Mutex>, + /// The max number of concurrent streams, or None if unbounded. + max_streams: Option>, + /// The max number of concurrent requests per stream. + max_queue_depth: NonZero, + /// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`. + /// None if the pool is unbounded. + limiter: Option>, + /// Reaps idle streams. + idle_reaper: Reaper, /// Stream ID generator. next_stream_id: AtomicUsize, } @@ -363,24 +437,40 @@ type ResponseSender = oneshot::Sender>; struct StreamEntry { /// Sends caller requests to the stream task. The stream task exits when this is dropped. sender: RequestSender, - /// Number of in-flight requests on this stream. This is an atomic to allow decrementing it on - /// completion without acquiring the `StreamPool::streams` lock. - queue_depth: Arc, + /// Number of in-flight requests on this stream. + queue_depth: usize, + /// The time when this stream went idle (queue_depth == 0). + /// INVARIANT: Some if queue_depth == 0, otherwise None. + idle_since: Option, } impl StreamPool { - /// Creates a new stream pool, using the given client pool. + /// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth` + /// concurrent requests on each stream, and use up to `max_streams` concurrent streams. /// - /// NB: the stream pool should use a dedicated client pool. Otherwise, long-lived streams may - /// fill up the client pool and starve out unary requests. Client pools can share the same - /// `ChannelPool` though, since the channel pool is unbounded. - pub fn new(client_pool: Arc) -> Arc { - Arc::new(Self { + /// The client pool must be unbounded. The stream pool will enforce its own limits, and because + /// streams are long-lived they can cause persistent starvation if they exhaust the client pool. + /// The stream pool should generally have its own dedicated client pool (but it can share a + /// channel pool with others since these are always unbounded). + pub fn new( + client_pool: Arc, + max_streams: Option>, + max_queue_depth: NonZero, + ) -> Arc { + assert!(client_pool.limiter.is_none(), "bounded client pool"); + let pool = Arc::new(Self { client_pool, - streams: Arc::default(), - limiter: Arc::new(Semaphore::new(CLIENT_LIMIT * STREAM_QUEUE_DEPTH)), + streams: Mutex::default(), + limiter: max_streams.map(|max_streams| { + Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get())) + }), + max_streams, + max_queue_depth, + idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL), next_stream_id: AtomicUsize::default(), - }) + }); + pool.idle_reaper.spawn(&pool); + pool } /// Acquires an available stream from the pool, or spins up a new stream async if all streams @@ -400,34 +490,33 @@ impl StreamPool { /// * Allow concurrent clients to join onto streams while they're spun up. /// * Allow spinning up multiple streams concurrently, but don't overshoot limits. /// - /// For now, we just do something simple and functional, but very inefficient (linear scan). - pub async fn get(&self) -> StreamGuard { - let permit = self - .limiter - .clone() - .acquire_owned() - .await - .expect("never closed"); + /// For now, we just do something simple but inefficient (linear scan under mutex). + pub async fn get(self: &Arc) -> StreamGuard { + // Acquire a permit if the pool is bounded. + let mut permit = None; + if let Some(limiter) = self.limiter.clone() { + permit = Some(limiter.acquire_owned().await.expect("never closed")); + } let mut streams = self.streams.lock().unwrap(); // Look for a pooled stream with available capacity. - for entry in streams.values() { + for (&id, entry) in streams.iter_mut() { assert!( - entry.queue_depth.load(Ordering::Relaxed) <= STREAM_QUEUE_DEPTH, + entry.queue_depth <= self.max_queue_depth.get(), "stream queue overflow" ); - if entry - .queue_depth - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |queue_depth| { - // Increment the queue depth via compare-and-swap. - // TODO: review ordering. - (queue_depth < STREAM_QUEUE_DEPTH).then_some(queue_depth + 1) - }) - .is_ok() - { + assert_eq!( + entry.idle_since.is_some(), + entry.queue_depth == 0, + "incorrect stream idle state" + ); + if entry.queue_depth < self.max_queue_depth.get() { + entry.queue_depth += 1; + entry.idle_since = None; return StreamGuard { + pool: Arc::downgrade(self), + id, sender: entry.sender.clone(), - queue_depth: entry.queue_depth.clone(), permit, }; } @@ -437,35 +526,36 @@ impl StreamPool { // return the guard, while spinning up the stream task async. This allows other callers to // join onto this stream and also create additional streams concurrently if this fills up. let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed); - let queue_depth = Arc::new(AtomicUsize::new(1)); // reserve quota for this caller - let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH); + let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get()); let entry = StreamEntry { sender: req_tx.clone(), - queue_depth: queue_depth.clone(), + queue_depth: 1, // reserve quota for this caller + idle_since: None, }; streams.insert(id, entry); - // NB: make sure we don't overshoot the client limit. The semaphore limit is CLIENT_LIMIT * - // STREAM_QUEUE_DEPTH, but if we were to misaccount queue depth we'd try to spin up more - // streams than CLIENT_LIMIT and block on the client pool ~forever. This should not happen - // because we only acquire queue depth under lock and after acquiring a semaphore permit. - assert!(streams.len() <= CLIENT_LIMIT, "stream overflow"); + if let Some(max_streams) = self.max_streams { + assert!(streams.len() <= max_streams.get(), "stream overflow"); + }; let client_pool = self.client_pool.clone(); - let streams = self.streams.clone(); + let pool = Arc::downgrade(self); tokio::spawn(async move { if let Err(err) = Self::run_stream(client_pool, req_rx).await { error!("stream failed: {err}"); } - // Remove stream from pool on exit. - let entry = streams.lock().unwrap().remove(&id); - assert!(entry.is_some(), "unknown stream ID: {id}"); + // Remove stream from pool on exit. Weak reference to avoid holding the pool alive. + if let Some(pool) = pool.upgrade() { + let entry = pool.streams.lock().unwrap().remove(&id); + assert!(entry.is_some(), "unknown stream ID: {id}"); + } }); StreamGuard { + pool: Arc::downgrade(self), + id, sender: req_tx, - queue_depth, permit, } } @@ -484,19 +574,22 @@ impl StreamPool { // Acquire a client from the pool and create a stream. let mut client = client_pool.get().await?; - let (req_tx, req_rx) = mpsc::channel(STREAM_QUEUE_DEPTH); - let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx); + // NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could + // theoretically deadlock if both the client and server block on sends (since we're not + // reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and + // low queue depths, but it was seen to happen with the libpq protocol so better safe than + // sorry. It should never buffer more than the queue depth anyway, but using an unbounded + // channel guarantees that it will never block. + let (req_tx, req_rx) = mpsc::unbounded_channel(); + let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx); let mut resp_stream = client.get_pages(req_stream).await?; // Track caller response channels by request ID. If the task returns early, these response // channels will be dropped and the waiting callers will receive an error. - let mut callers = HashMap::with_capacity(STREAM_QUEUE_DEPTH); + let mut callers = HashMap::new(); // Process requests and responses. loop { - // NB: this can trip if the server doesn't respond to a request, so only debug_assert. - debug_assert!(callers.len() <= STREAM_QUEUE_DEPTH, "stream queue overflow"); - tokio::select! { // Receive requests from callers and send them to the stream. req = caller_rx.recv() => { @@ -515,8 +608,8 @@ impl StreamPool { } callers.insert(req.request_id, resp_tx); - // Send the request on the stream. Bail out if the send fails. - req_tx.send(req).await.map_err(|_| { + // Send the request on the stream. Bail out if the stream is closed. + req_tx.send(req).map_err(|_| { tonic::Status::unavailable("stream closed") })?; } @@ -540,12 +633,27 @@ impl StreamPool { } } +impl Reapable for StreamPool { + /// Reaps streams that have been idle since before the cutoff. + fn reap_idle(&self, cutoff: Instant) { + self.streams.lock().unwrap().retain(|_, entry| { + let Some(idle_since) = entry.idle_since else { + assert_ne!(entry.queue_depth, 0, "empty stream not marked idle"); + return true; + }; + assert_eq!(entry.queue_depth, 0, "idle stream has requests"); + idle_since >= cutoff + }); + } +} + /// A pooled stream reference. Can be used to send a single request, to properly enforce queue /// depth. Queue depth is already reserved and will be returned on drop. pub struct StreamGuard { + pool: Weak, + id: StreamID, sender: RequestSender, - queue_depth: Arc, - permit: OwnedSemaphorePermit, + permit: Option, // None if pool is unbounded } impl StreamGuard { @@ -576,11 +684,78 @@ impl StreamGuard { impl Drop for StreamGuard { fn drop(&mut self) { + let Some(pool) = self.pool.upgrade() else { + return; // pool was dropped + }; + // Release the queue depth reservation on drop. This can prematurely decrement it if dropped // before the response is received, but that's okay. - let prev_queue_depth = self.queue_depth.fetch_sub(1, Ordering::SeqCst); - assert!(prev_queue_depth > 0, "stream queue underflow"); + let mut streams = pool.streams.lock().unwrap(); + let entry = streams.get_mut(&self.id).expect("unknown stream"); + assert!(entry.idle_since.is_none(), "active stream marked idle"); + assert!(entry.queue_depth > 0, "stream queue underflow"); + entry.queue_depth -= 1; + if entry.queue_depth == 0 { + entry.idle_since = Some(Instant::now()); // mark stream as idle + } _ = self.permit; // returned on drop, referenced for visibility } } + +/// Periodically reaps idle resources from a pool. +struct Reaper { + /// The task check interval. + interval: Duration, + /// The threshold for reaping idle resources. + threshold: Duration, + /// Cancels the reaper task. Cancelled when the reaper is dropped. + cancel: CancellationToken, +} + +impl Reaper { + /// Creates a new reaper. + pub fn new(threshold: Duration, interval: Duration) -> Self { + Self { + cancel: CancellationToken::new(), + threshold, + interval, + } + } + + /// Spawns a task to periodically reap idle resources from the given task pool. The task is + /// cancelled when the reaper is dropped. + pub fn spawn(&self, pool: &Arc) { + // NB: hold a weak pool reference, otherwise the task will prevent dropping the pool. + let pool = Arc::downgrade(pool); + let cancel = self.cancel.clone(); + let (interval, threshold) = (self.interval, self.threshold); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = tokio::time::sleep(interval) => { + let Some(pool) = pool.upgrade() else { + return; // pool was dropped + }; + pool.reap_idle(Instant::now() - threshold); + } + + _ = cancel.cancelled() => return, + } + } + }); + } +} + +impl Drop for Reaper { + fn drop(&mut self) { + self.cancel.cancel(); // cancel reaper task + } +} + +/// A reapable resource pool. +trait Reapable: Send + Sync + 'static { + /// Reaps resources that have been idle since before the given cutoff. + fn reap_idle(&self, cutoff: Instant); +} diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 3c10a410d2..c089a4f1d8 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -387,7 +387,7 @@ impl From for proto::GetPageRequest { pub type RequestID = u64; /// A GetPage request class. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, strum_macros::Display)] pub enum GetPageClass { /// Unknown class. For backwards compatibility: used when an older client version sends a class /// that a newer server version has removed. @@ -400,6 +400,19 @@ pub enum GetPageClass { Background, } +impl GetPageClass { + /// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than + /// latency-sensitive). + pub fn is_bulk(&self) -> bool { + match self { + Self::Unknown => false, + Self::Normal => false, + Self::Prefetch => true, + Self::Background => true, + } + } +} + impl From for GetPageClass { fn from(pb: proto::GetPageClass) -> Self { match pb { diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 327384fd82..299fe7e159 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -29,8 +29,8 @@ use pageserver::task_mgr::{ }; use pageserver::tenant::{TenantSharedResources, mgr, secondary}; use pageserver::{ - CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener, http, - page_cache, page_service, task_mgr, virtual_file, + CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener, + MetricsCollectionTask, http, page_cache, page_service, task_mgr, virtual_file, }; use postgres_backend::AuthType; use remote_storage::GenericRemoteStorage; @@ -41,6 +41,7 @@ use tracing_utils::OtelGuard; use utils::auth::{JwtAuth, SwappableJwtAuth}; use utils::crashsafe::syncfs; use utils::logging::TracingErrorLayerEnablement; +use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR}; use utils::sentry_init::init_sentry; use utils::{failpoint_support, logging, project_build_tag, project_git_version, tcp_listener}; @@ -763,6 +764,41 @@ fn start_pageserver( (http_task, https_task) }; + /* BEGIN_HADRON */ + let metrics_collection_task = { + let cancel = shutdown_pageserver.child_token(); + let task = crate::BACKGROUND_RUNTIME.spawn({ + let cancel = cancel.clone(); + let background_jobs_barrier = background_jobs_barrier.clone(); + async move { + if conf.force_metric_collection_on_scrape { + return; + } + + // first wait until background jobs are cleared to launch. + tokio::select! { + _ = cancel.cancelled() => { return; }, + _ = background_jobs_barrier.wait() => {} + }; + let mut interval = tokio::time::interval(METRICS_COLLECTION_INTERVAL); + loop { + tokio::select! { + _ = cancel.cancelled() => { + tracing::info!("cancelled metrics collection task, exiting..."); + break; + }, + _ = interval.tick() => {} + } + tokio::task::spawn_blocking(|| { + METRICS_COLLECTOR.run_once(true); + }); + } + } + }); + MetricsCollectionTask(CancellableTask { task, cancel }) + }; + /* END_HADRON */ + let consumption_metrics_tasks = { let cancel = shutdown_pageserver.child_token(); let task = crate::BACKGROUND_RUNTIME.spawn({ @@ -844,6 +880,7 @@ fn start_pageserver( https_endpoint_listener, page_service, page_service_grpc, + metrics_collection_task, consumption_metrics_tasks, disk_usage_eviction_task, &tenant_manager, @@ -889,8 +926,11 @@ async fn create_remote_storage_client( "Simulating remote failures for first {} attempts of each op", conf.test_remote_failures ); - remote_storage = - GenericRemoteStorage::unreliable_wrapper(remote_storage, conf.test_remote_failures); + remote_storage = GenericRemoteStorage::unreliable_wrapper( + remote_storage, + conf.test_remote_failures, + conf.test_remote_failures_probability, + ); } Ok(remote_storage) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 9448b9e655..d8fd1e95b8 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -147,7 +147,11 @@ pub struct PageServerConf { pub disk_usage_based_eviction: DiskUsageEvictionTaskConfig, + // The number of allowed failures in remote storage operations. pub test_remote_failures: u64, + // The probability of failure in remote storage operations. Only works when test_remote_failures > 1. + // Use 100 for 100% failure, 0 for no failure. + pub test_remote_failures_probability: u64, pub ondemand_download_behavior_treat_error_as_warn: bool, @@ -248,6 +252,14 @@ pub struct PageServerConf { pub timeline_import_config: pageserver_api::config::TimelineImportConfig, pub basebackup_cache_config: Option, + + /// Defines what is a big tenant for the purpose of image layer generation. + /// See Timeline::should_check_if_image_layers_required + pub image_layer_generation_large_timeline_threshold: Option, + + /// Controls whether to collect all metrics on each scrape or to return potentially stale + /// results. + pub force_metric_collection_on_scrape: bool, } /// Token for authentication to safekeepers @@ -392,6 +404,7 @@ impl PageServerConf { synthetic_size_calculation_interval, disk_usage_based_eviction, test_remote_failures, + test_remote_failures_probability, ondemand_download_behavior_treat_error_as_warn, background_task_maximum_delay, control_plane_api, @@ -427,6 +440,8 @@ impl PageServerConf { posthog_config, timeline_import_config, basebackup_cache_config, + image_layer_generation_large_timeline_threshold, + force_metric_collection_on_scrape, } = config_toml; let mut conf = PageServerConf { @@ -461,6 +476,7 @@ impl PageServerConf { synthetic_size_calculation_interval, disk_usage_based_eviction, test_remote_failures, + test_remote_failures_probability, ondemand_download_behavior_treat_error_as_warn, background_task_maximum_delay, control_plane_api: control_plane_api @@ -484,6 +500,8 @@ impl PageServerConf { dev_mode, timeline_import_config, basebackup_cache_config, + image_layer_generation_large_timeline_threshold, + force_metric_collection_on_scrape, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index c888b616c7..c04c5af17f 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -79,8 +79,8 @@ use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerNa use crate::tenant::timeline::layer_manager::LayerManagerLockHolder; use crate::tenant::timeline::offload::{OffloadError, offload_timeline}; use crate::tenant::timeline::{ - CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline, - WaitLsnTimeout, WaitLsnWaiter, import_pgdata, + CompactFlags, CompactOptions, CompactRequest, MarkInvisibleRequest, Timeline, WaitLsnTimeout, + WaitLsnWaiter, import_pgdata, }; use crate::tenant::{ GetTimelineError, LogicalSizeCalculationCause, OffloadedTimeline, PageReconstructError, @@ -2503,9 +2503,10 @@ async fn timeline_checkpoint_handler( .compact(&cancel, flags, &ctx) .await .map_err(|e| - match e { - CompactionError::ShuttingDown => ApiError::ShuttingDown, - CompactionError::Other(e) => ApiError::InternalServerError(e), + if e.is_cancel() { + ApiError::ShuttingDown + } else { + ApiError::InternalServerError(e.into_anyhow()) } )?; } @@ -3940,9 +3941,14 @@ pub fn make_router( .expect("construct launch timestamp header middleware"), ); + let force_metric_collection_on_scrape = state.conf.force_metric_collection_on_scrape; + + let prometheus_metrics_handler_wrapper = + move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape); + Ok(router .data(state) - .get("/metrics", |r| request_span(r, prometheus_metrics_handler)) + .get("/metrics", move |r| request_span(r, prometheus_metrics_handler_wrapper)) .get("/profile/cpu", |r| request_span(r, profile_cpu_handler)) .get("/profile/heap", |r| request_span(r, profile_heap_handler)) .get("/v1/status", |r| api_handler(r, status_handler)) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 0dd3c465e0..0864026f6b 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -73,6 +73,9 @@ pub struct HttpEndpointListener(pub CancellableTask); pub struct HttpsEndpointListener(pub CancellableTask); pub struct ConsumptionMetricsTasks(pub CancellableTask); pub struct DiskUsageEvictionTask(pub CancellableTask); +// HADRON +pub struct MetricsCollectionTask(pub CancellableTask); + impl CancellableTask { pub async fn shutdown(self) { self.cancel.cancel(); @@ -87,6 +90,7 @@ pub async fn shutdown_pageserver( https_listener: Option, page_service: page_service::Listener, grpc_task: Option, + metrics_collection_task: MetricsCollectionTask, consumption_metrics_worker: ConsumptionMetricsTasks, disk_usage_eviction_task: Option, tenant_manager: &TenantManager, @@ -211,6 +215,14 @@ pub async fn shutdown_pageserver( // Best effort to persist any outstanding deletions, to avoid leaking objects deletion_queue.shutdown(Duration::from_secs(5)).await; + // HADRON + timed( + metrics_collection_task.0.shutdown(), + "shutdown metrics collections metrics", + Duration::from_secs(1), + ) + .await; + timed( consumption_metrics_worker.0.shutdown(), "shutdown consumption metrics", diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 72a3d800b9..a184cb7e91 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3327,7 +3327,7 @@ impl TenantShard { // Ignore this, we likely raced with unarchival. OffloadError::NotArchived => Ok(()), OffloadError::AlreadyInProgress => Ok(()), - OffloadError::Cancelled => Err(CompactionError::ShuttingDown), + OffloadError::Cancelled => Err(CompactionError::new_cancelled()), // don't break the anyhow chain OffloadError::Other(err) => Err(CompactionError::Other(err)), })?; @@ -3357,16 +3357,13 @@ impl TenantShard { /// Trips the compaction circuit breaker if appropriate. pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) { - match err { - err if err.is_cancel() => {} - CompactionError::ShuttingDown => (), - CompactionError::Other(err) => { - self.compaction_circuit_breaker - .lock() - .unwrap() - .fail(&CIRCUIT_BREAKERS_BROKEN, err); - } + if err.is_cancel() { + return; } + self.compaction_circuit_breaker + .lock() + .unwrap() + .fail(&CIRCUIT_BREAKERS_BROKEN, err); } /// Cancel scheduled compaction tasks @@ -4210,6 +4207,15 @@ impl TenantShard { .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold) } + // HADRON + pub fn get_image_creation_timeout(&self) -> Option { + let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); + tenant_conf.image_layer_force_creation_period.or(self + .conf + .default_tenant_conf + .image_layer_force_creation_period) + } + pub fn get_pitr_interval(&self) -> Duration { let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); tenant_conf diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index bcece5589a..08fc7d61a5 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -17,17 +17,14 @@ use tracing::*; use utils::backoff::exponential_backoff_duration; use utils::completion::Barrier; use utils::pausable_failpoint; -use utils::sync::gate::GateError; use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS}; use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind}; -use crate::tenant::blob_io::WriteBlobError; use crate::tenant::throttle::Stats; use crate::tenant::timeline::CompactionError; use crate::tenant::timeline::compaction::CompactionOutcome; use crate::tenant::{TenantShard, TenantState}; -use crate::virtual_file::owned_buffers_io::write::FlushTaskError; /// Semaphore limiting concurrent background tasks (across all tenants). /// @@ -310,45 +307,12 @@ pub(crate) fn log_compaction_error( task_cancelled: bool, degrade_to_warning: bool, ) { - use CompactionError::*; + let is_cancel = err.is_cancel(); - use crate::tenant::PageReconstructError; - use crate::tenant::upload_queue::NotInitialized; - - let level = match err { - e if e.is_cancel() => return, - ShuttingDown => return, - _ if task_cancelled => Level::INFO, - Other(err) => { - let root_cause = err.root_cause(); - - let upload_queue = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_stopping()); - let timeline = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_cancel()); - let buffered_writer_flush_task_canelled = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_cancel()); - let write_blob_cancelled = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_cancel()); - let gate_closed = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_cancel()); - let is_stopping = upload_queue - || timeline - || buffered_writer_flush_task_canelled - || write_blob_cancelled - || gate_closed; - - if is_stopping { - Level::INFO - } else { - Level::ERROR - } - } + let level = if is_cancel || task_cancelled { + Level::INFO + } else { + Level::ERROR }; if let Some((error_count, sleep_duration)) = retry_info { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4cb5519c9c..df3321c960 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -352,6 +352,13 @@ pub struct Timeline { last_image_layer_creation_check_at: AtomicLsn, last_image_layer_creation_check_instant: std::sync::Mutex>, + // HADRON + /// If a key range has writes with LSN > force_image_creation_lsn, then we should force image layer creation + /// on this key range. + force_image_creation_lsn: AtomicLsn, + /// The last time instant when force_image_creation_lsn is computed. + force_image_creation_lsn_computed_at: std::sync::Mutex>, + /// Current logical size of the "datadir", at the last LSN. current_logical_size: LogicalSize, @@ -1003,7 +1010,7 @@ impl From for tonic::Status { impl From for CompactionError { fn from(e: CreateImageLayersError) -> Self { match e { - CreateImageLayersError::Cancelled => CompactionError::ShuttingDown, + CreateImageLayersError::Cancelled => CompactionError::new_cancelled(), CreateImageLayersError::Other(e) => { CompactionError::Other(e.context("create image layers")) } @@ -2156,12 +2163,7 @@ impl Timeline { match &result { Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed), Err(e) if e.is_cancel() => {} - Err(CompactionError::ShuttingDown) => { - // Covered by the `Err(e) if e.is_cancel()` branch. - } - Err(CompactionError::Other(_)) => { - self.compaction_failed.store(true, AtomicOrdering::Relaxed) - } + Err(_) => self.compaction_failed.store(true, AtomicOrdering::Relaxed), }; result @@ -2898,6 +2900,18 @@ impl Timeline { .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold) } + // HADRON + fn get_image_creation_timeout(&self) -> Option { + let tenant_conf = self.tenant_conf.load(); + tenant_conf + .tenant_conf + .image_layer_force_creation_period + .or(self + .conf + .default_tenant_conf + .image_layer_force_creation_period) + } + fn get_compaction_algorithm_settings(&self) -> CompactionAlgorithmSettings { let tenant_conf = &self.tenant_conf.load(); tenant_conf @@ -3165,7 +3179,9 @@ impl Timeline { repartition_threshold: 0, last_image_layer_creation_check_at: AtomicLsn::new(0), last_image_layer_creation_check_instant: Mutex::new(None), - + // HADRON + force_image_creation_lsn: AtomicLsn::new(0), + force_image_creation_lsn_computed_at: std::sync::Mutex::new(None), last_received_wal: Mutex::new(None), rel_size_latest_cache: RwLock::new(HashMap::new()), rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)), @@ -5088,6 +5104,7 @@ impl Timeline { .create_image_layers( &partitions, self.initdb_lsn, + None, ImageLayerCreationMode::Initial, ctx, LastImageLayerCreationStatus::Initial, @@ -5359,14 +5376,19 @@ impl Timeline { } // Is it time to create a new image layer for the given partition? True if we want to generate. - async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool { + async fn time_for_new_image_layer( + &self, + partition: &KeySpace, + lsn: Lsn, + force_image_creation_lsn: Option, + ) -> bool { let threshold = self.get_image_creation_threshold(); let guard = self.layers.read(LayerManagerLockHolder::Compaction).await; let Ok(layers) = guard.layer_map() else { return false; }; - + let mut min_image_lsn: Lsn = Lsn::MAX; let mut max_deltas = 0; for part_range in &partition.ranges { let image_coverage = layers.image_coverage(part_range, lsn); @@ -5401,9 +5423,22 @@ impl Timeline { return true; } } + min_image_lsn = min(min_image_lsn, img_lsn); } } + // HADRON + if min_image_lsn < force_image_creation_lsn.unwrap_or(Lsn(0)) && max_deltas > 0 { + info!( + "forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}", + partition.ranges[0].start, + partition.ranges[0].end, + min_image_lsn, + force_image_creation_lsn.unwrap() + ); + return true; + } + debug!( max_deltas, "none of the partitioned ranges had >= {threshold} deltas" @@ -5629,7 +5664,7 @@ impl Timeline { /// suffer from the lack of image layers /// 2. For small tenants (that can mostly fit in RAM), we use a much longer interval fn should_check_if_image_layers_required(self: &Arc, lsn: Lsn) -> bool { - const LARGE_TENANT_THRESHOLD: u64 = 2 * 1024 * 1024 * 1024; + let large_timeline_threshold = self.conf.image_layer_generation_large_timeline_threshold; let last_checks_at = self.last_image_layer_creation_check_at.load(); let distance = lsn @@ -5643,12 +5678,12 @@ impl Timeline { let mut time_based_decision = false; let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap(); if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() { - let check_required_after = if Into::::into(&logical_size) >= LARGE_TENANT_THRESHOLD - { - self.get_checkpoint_timeout() - } else { - Duration::from_secs(3600 * 48) - }; + let check_required_after = + if Some(Into::::into(&logical_size)) >= large_timeline_threshold { + self.get_checkpoint_timeout() + } else { + Duration::from_secs(3600 * 48) + }; time_based_decision = match *last_check_instant { Some(last_check) => { @@ -5676,10 +5711,12 @@ impl Timeline { /// true = we have generate all image layers, false = we preempt the process for L0 compaction. /// /// `partition_mode` is only for logging purpose and is not used anywhere in this function. + #[allow(clippy::too_many_arguments)] async fn create_image_layers( self: &Arc, partitioning: &KeyPartitioning, lsn: Lsn, + force_image_creation_lsn: Option, mode: ImageLayerCreationMode, ctx: &RequestContext, last_status: LastImageLayerCreationStatus, @@ -5783,7 +5820,11 @@ impl Timeline { } else if let ImageLayerCreationMode::Try = mode { // check_for_image_layers = false -> skip // check_for_image_layers = true -> check time_for_new_image_layer -> skip/generate - if !check_for_image_layers || !self.time_for_new_image_layer(partition, lsn).await { + if !check_for_image_layers + || !self + .time_for_new_image_layer(partition, lsn, force_image_creation_lsn) + .await + { start = img_range.end; continue; } @@ -6104,26 +6145,88 @@ impl Drop for Timeline { } } -/// Top-level failure to compact. -#[derive(Debug, thiserror::Error)] -pub(crate) enum CompactionError { - #[error("The timeline or pageserver is shutting down")] - ShuttingDown, - #[error(transparent)] - Other(anyhow::Error), -} +pub(crate) use compaction_error::CompactionError; +/// In a private mod to enforce that [`CompactionError::is_cancel`] is used +/// instead of `match`ing on [`CompactionError::ShuttingDown`]. +mod compaction_error { + use utils::sync::gate::GateError; -impl CompactionError { - /// Errors that can be ignored, i.e., cancel and shutdown. - pub fn is_cancel(&self) -> bool { - matches!(self, Self::ShuttingDown) + use crate::{ + pgdatadir_mapping::CollectKeySpaceError, + tenant::{PageReconstructError, blob_io::WriteBlobError, upload_queue::NotInitialized}, + virtual_file::owned_buffers_io::write::FlushTaskError, + }; + + /// Top-level failure to compact. Use [`Self::is_cancel`]. + #[derive(Debug, thiserror::Error)] + pub(crate) enum CompactionError { + /// Use [`Self::is_cancel`] instead of checking for this variant. + #[error("The timeline or pageserver is shutting down")] + #[allow(private_interfaces)] + ShuttingDown(ForbidMatching), // private ForbidMatching enforces use of [`Self::is_cancel`]. + #[error(transparent)] + Other(anyhow::Error), } - pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self { - if err.is_cancel() { - Self::ShuttingDown - } else { - Self::Other(err.into_anyhow()) + #[derive(Debug)] + struct ForbidMatching; + + impl CompactionError { + pub fn new_cancelled() -> Self { + Self::ShuttingDown(ForbidMatching) + } + /// Errors that can be ignored, i.e., cancel and shutdown. + pub fn is_cancel(&self) -> bool { + let other = match self { + CompactionError::ShuttingDown(_) => return true, + CompactionError::Other(other) => other, + }; + + // The write path of compaction in particular often lacks differentiated + // handling errors stemming from cancellation from other errors. + // So, if requested, we also check the ::Other variant by downcasting. + // The list below has been found empirically from flaky tests and production logs. + // The process is simple: on ::Other(), compaction will print the enclosed + // anyhow::Error in debug mode, i.e., with backtrace. That backtrace contains the + // line where the write path / compaction code does undifferentiated error handling + // from a non-anyhow type to an anyhow type. Add the type to the list of downcasts + // below, following the same is_cancel() pattern. + + let root_cause = other.root_cause(); + + let upload_queue = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_stopping()); + let timeline = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_cancel()); + let buffered_writer_flush_task_canelled = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_cancel()); + let write_blob_cancelled = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_cancel()); + let gate_closed = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_cancel()); + upload_queue + || timeline + || buffered_writer_flush_task_canelled + || write_blob_cancelled + || gate_closed + } + pub fn into_anyhow(self) -> anyhow::Error { + match self { + CompactionError::ShuttingDown(ForbidMatching) => anyhow::Error::new(self), + CompactionError::Other(e) => e, + } + } + pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self { + if err.is_cancel() { + Self::new_cancelled() + } else { + Self::Other(err.into_anyhow()) + } } } } @@ -6135,7 +6238,7 @@ impl From for CompactionError { CompactionError::Other(anyhow::anyhow!(value)) } super::upload_queue::NotInitialized::ShuttingDown - | super::upload_queue::NotInitialized::Stopped => CompactionError::ShuttingDown, + | super::upload_queue::NotInitialized::Stopped => CompactionError::new_cancelled(), } } } @@ -6145,7 +6248,7 @@ impl From for CompactionError { match e { super::storage_layer::layer::DownloadError::TimelineShutdown | super::storage_layer::layer::DownloadError::DownloadCancelled => { - CompactionError::ShuttingDown + CompactionError::new_cancelled() } super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads | super::storage_layer::layer::DownloadError::DownloadRequired @@ -6164,14 +6267,14 @@ impl From for CompactionError { impl From for CompactionError { fn from(_: layer_manager::Shutdown) -> Self { - CompactionError::ShuttingDown + CompactionError::new_cancelled() } } impl From for CompactionError { fn from(e: super::storage_layer::errors::PutError) -> Self { if e.is_cancel() { - CompactionError::ShuttingDown + CompactionError::new_cancelled() } else { CompactionError::Other(e.into_anyhow()) } @@ -6270,7 +6373,7 @@ impl Timeline { let mut guard = tokio::select! { guard = self.layers.write(LayerManagerLockHolder::Compaction) => guard, _ = self.cancel.cancelled() => { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } }; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 7ebef03e78..720ec6073d 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -4,10 +4,11 @@ //! //! The old legacy algorithm is implemented directly in `timeline.rs`. +use std::cmp::min; use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::ops::{Deref, Range}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; use super::layer_manager::LayerManagerLockHolder; use super::{ @@ -33,6 +34,7 @@ use pageserver_api::models::{CompactInfoResponse, CompactKeyRange}; use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId}; use pageserver_compaction::helpers::{fully_contains, overlaps_with}; use pageserver_compaction::interface::*; +use postgres_ffi::to_pg_timestamp; use serde::Serialize; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio_util::sync::CancellationToken; @@ -45,6 +47,7 @@ use wal_decoder::models::value::Value; use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}; use crate::page_cache; +use crate::pgdatadir_mapping::LsnForTimestamp; use crate::statvfs::Statvfs; use crate::tenant::checks::check_valid_layermap; use crate::tenant::gc_block::GcBlock; @@ -572,8 +575,8 @@ impl GcCompactionQueue { } match res { Ok(res) => Ok(res), - Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown), - Err(CompactionError::Other(_)) => { + Err(e) if e.is_cancel() => Err(e), + Err(_) => { // There are some cases where traditional gc might collect some layer // files causing gc-compaction cannot read the full history of the key. // This needs to be resolved in the long-term by improving the compaction @@ -1260,13 +1263,19 @@ impl Timeline { // Is the timeline being deleted? if self.is_stopping() { trace!("Dropping out of compaction on timeline shutdown"); - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let target_file_size = self.get_checkpoint_distance(); // Define partitioning schema if needed + // HADRON + let force_image_creation_lsn = self + .get_or_compute_force_image_creation_lsn(cancel, ctx) + .await + .map_err(CompactionError::Other)?; + // 1. L0 Compact let l0_outcome = { let timer = self.metrics.compact_time_histo.start_timer(); @@ -1274,6 +1283,7 @@ impl Timeline { .compact_level0( target_file_size, options.flags.contains(CompactFlags::ForceL0Compaction), + force_image_creation_lsn, ctx, ) .await?; @@ -1376,6 +1386,7 @@ impl Timeline { .create_image_layers( &partitioning, lsn, + force_image_creation_lsn, mode, &image_ctx, self.last_image_layer_creation_status @@ -1472,6 +1483,63 @@ impl Timeline { Ok(CompactionOutcome::Done) } + /* BEGIN_HADRON */ + // Get the force image creation LSN. Compute it if the last computed LSN is too old. + async fn get_or_compute_force_image_creation_lsn( + self: &Arc, + cancel: &CancellationToken, + ctx: &RequestContext, + ) -> anyhow::Result> { + const FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes + let image_layer_force_creation_period = self.get_image_creation_timeout(); + if image_layer_force_creation_period.is_none() { + return Ok(None); + } + + let image_layer_force_creation_period = image_layer_force_creation_period.unwrap(); + let force_image_creation_lsn_computed_at = + *self.force_image_creation_lsn_computed_at.lock().unwrap(); + if force_image_creation_lsn_computed_at.is_none() + || force_image_creation_lsn_computed_at.unwrap().elapsed() + > FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL + { + let now: SystemTime = SystemTime::now(); + let timestamp = now + .checked_sub(image_layer_force_creation_period) + .ok_or_else(|| { + anyhow::anyhow!( + "image creation timeout is too large: {image_layer_force_creation_period:?}" + ) + })?; + let timestamp = to_pg_timestamp(timestamp); + let force_image_creation_lsn = match self + .find_lsn_for_timestamp(timestamp, cancel, ctx) + .await? + { + LsnForTimestamp::Present(lsn) | LsnForTimestamp::Future(lsn) => lsn, + _ => { + let gc_lsn = *self.get_applied_gc_cutoff_lsn(); + tracing::info!( + "no LSN found for timestamp {timestamp:?}, using latest GC cutoff LSN {}", + gc_lsn + ); + gc_lsn + } + }; + self.force_image_creation_lsn + .store(force_image_creation_lsn); + *self.force_image_creation_lsn_computed_at.lock().unwrap() = Some(Instant::now()); + tracing::info!( + "computed force image creation LSN: {}", + force_image_creation_lsn + ); + Ok(Some(force_image_creation_lsn)) + } else { + Ok(Some(self.force_image_creation_lsn.load())) + } + } + /* END_HADRON */ + /// Check for layers that are elegible to be rewritten: /// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that /// we don't indefinitely retain keys in this shard that aren't needed. @@ -1624,7 +1692,7 @@ impl Timeline { for (i, layer) in layers_to_rewrite.into_iter().enumerate() { if self.cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total); @@ -1722,7 +1790,7 @@ impl Timeline { Ok(()) => {}, Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)), Err(WaitCompletionError::UploadQueueShutDownOrStopped) => { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } }, // Don't wait if there's L0 compaction to do. We don't need to update the outcome @@ -1801,6 +1869,7 @@ impl Timeline { self: &Arc, target_file_size: u64, force_compaction_ignore_threshold: bool, + force_compaction_lsn: Option, ctx: &RequestContext, ) -> Result { let CompactLevel0Phase1Result { @@ -1821,6 +1890,7 @@ impl Timeline { stats, target_file_size, force_compaction_ignore_threshold, + force_compaction_lsn, &ctx, ) .instrument(phase1_span) @@ -1843,6 +1913,7 @@ impl Timeline { mut stats: CompactLevel0Phase1StatsBuilder, target_file_size: u64, force_compaction_ignore_threshold: bool, + force_compaction_lsn: Option, ctx: &RequestContext, ) -> Result { let begin = tokio::time::Instant::now(); @@ -1872,11 +1943,28 @@ impl Timeline { return Ok(CompactLevel0Phase1Result::default()); } } else { - debug!( - level0_deltas = level0_deltas.len(), - threshold, "too few deltas to compact" - ); - return Ok(CompactLevel0Phase1Result::default()); + // HADRON + let min_lsn = level0_deltas + .iter() + .map(|a| a.get_lsn_range().start) + .reduce(min); + if force_compaction_lsn.is_some() + && min_lsn.is_some() + && min_lsn.unwrap() < force_compaction_lsn.unwrap() + { + info!( + "forcing L0 compaction of {} L0 deltas. Min lsn: {}, force compaction lsn: {}", + level0_deltas.len(), + min_lsn.unwrap(), + force_compaction_lsn.unwrap() + ); + } else { + debug!( + level0_deltas = level0_deltas.len(), + threshold, "too few deltas to compact" + ); + return Ok(CompactLevel0Phase1Result::default()); + } } } @@ -1985,7 +2073,7 @@ impl Timeline { let mut all_keys = Vec::new(); for l in deltas_to_compact.iter() { if self.cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?; let keys = delta @@ -2078,7 +2166,7 @@ impl Timeline { stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now(); if self.cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now(); @@ -2186,7 +2274,7 @@ impl Timeline { // avoid hitting the cancellation token on every key. in benches, we end up // shuffling an order of million keys per layer, this means we'll check it // around tens of times per layer. - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let same_key = prev_key == Some(key); @@ -2271,7 +2359,7 @@ impl Timeline { if writer.is_none() { if self.cancel.is_cancelled() { // to be somewhat responsive to cancellation, check for each new layer - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } // Create writer if not initiaized yet writer = Some( @@ -2527,7 +2615,7 @@ impl Timeline { // Is the timeline being deleted? if self.is_stopping() { trace!("Dropping out of compaction on timeline shutdown"); - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let (dense_ks, _sparse_ks) = self @@ -3189,7 +3277,7 @@ impl Timeline { let gc_lock = async { tokio::select! { guard = self.gc_lock.lock() => Ok(guard), - _ = cancel.cancelled() => Err(CompactionError::ShuttingDown), + _ = cancel.cancelled() => Err(CompactionError::new_cancelled()), } }; @@ -3462,7 +3550,7 @@ impl Timeline { } total_layer_size += layer.layer_desc().file_size; if cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let should_yield = yield_for_l0 && self @@ -3609,7 +3697,7 @@ impl Timeline { } if cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let should_yield = yield_for_l0 diff --git a/pgxn/neon/neon_ddl_handler.c b/pgxn/neon/neon_ddl_handler.c index 2ce7b0086b..1f03e52c67 100644 --- a/pgxn/neon/neon_ddl_handler.c +++ b/pgxn/neon/neon_ddl_handler.c @@ -953,7 +953,9 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private) /* * Fire Event Trigger if both function owner and current user are - * superuser, or none of them are. + * superuser. Allow executing Event Trigger function that belongs to a + * superuser when connected as a non-superuser, even when the function is + * SECURITY DEFINER. */ else if (event == FHET_START /* still enable it to pass pg_regress tests */ @@ -976,32 +978,7 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private) function_is_owned_by_super = superuser_arg(function_owner); /* - * 1. Refuse to run SECURITY DEFINER function that belongs to a - * superuser when the current user is not a superuser itself. - */ - if (!role_is_super - && function_is_owned_by_super - && function_is_secdef) - { - char *func_name = get_func_name(flinfo->fn_oid); - - ereport(WARNING, - (errmsg("Skipping Event Trigger"), - errdetail("Event Trigger function \"%s\" is owned by \"%s\" " - "and is SECURITY DEFINER", - func_name, - GetUserNameFromId(function_owner, false)))); - - /* - * we can't skip execution directly inside the fmgr_hook so - * instead we change the event trigger function to a noop - * function. - */ - force_noop(flinfo); - } - - /* - * 2. Refuse to run functions that belongs to a non-superuser when the + * Refuse to run functions that belongs to a non-superuser when the * current user is a superuser. * * We could run a SECURITY DEFINER user-function here and be safe with @@ -1009,7 +986,7 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private) * infrastructure maintenance operations, where we prefer to skip * running user-defined code. */ - else if (role_is_super && !function_is_owned_by_super) + if (role_is_super && !function_is_owned_by_super) { char *func_name = get_func_name(flinfo->fn_oid); diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index b55cc14532..4d8df19476 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -267,7 +267,7 @@ async fn worker_inner( ) -> anyhow::Result<()> { #[cfg(any(test, feature = "testing"))] let storage = if config.test_remote_failures > 0 { - GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures) + GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures, 100) } else { storage }; diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 8fda625817..b2d5976ef4 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -37,6 +37,7 @@ use tracing::*; use utils::auth::{JwtAuth, Scope, SwappableJwtAuth}; use utils::id::NodeId; use utils::logging::{self, LogFormat, SecretString}; +use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR}; use utils::sentry_init::init_sentry; use utils::{pid_file, project_build_tag, project_git_version, tcp_listener}; @@ -243,6 +244,11 @@ struct Args { #[arg(long)] enable_tls_wal_service_api: bool, + /// Controls whether to collect all metrics on each scrape or to return potentially stale + /// results. + #[arg(long, default_value_t = true)] + force_metric_collection_on_scrape: bool, + /// Run in development mode (disables security checks) #[arg(long, help = "Run in development mode (disables security checks)")] dev: bool, @@ -428,6 +434,7 @@ async fn main() -> anyhow::Result<()> { ssl_ca_certs, use_https_safekeeper_api: args.use_https_safekeeper_api, enable_tls_wal_service_api: args.enable_tls_wal_service_api, + force_metric_collection_on_scrape: args.force_metric_collection_on_scrape, }); // initialize sentry if SENTRY_DSN is provided @@ -640,6 +647,26 @@ async fn start_safekeeper(conf: Arc) -> Result<()> { .map(|res| ("broker main".to_owned(), res)); tasks_handles.push(Box::pin(broker_task_handle)); + /* BEGIN_HADRON */ + if conf.force_metric_collection_on_scrape { + let metrics_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| BACKGROUND_RUNTIME.handle()) + .spawn(async move { + let mut interval: tokio::time::Interval = + tokio::time::interval(METRICS_COLLECTION_INTERVAL); + loop { + interval.tick().await; + tokio::task::spawn_blocking(|| { + METRICS_COLLECTOR.run_once(true); + }); + } + }) + .map(|res| ("broker main".to_owned(), res)); + tasks_handles.push(Box::pin(metrics_handle)); + } + /* END_HADRON */ + set_build_info_metric(GIT_VERSION, BUILD_TAG); // TODO: update tokio-stream, convert to real async Stream with diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 384c582678..4b061c65d9 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -699,6 +699,11 @@ pub fn make_router( })) } + let force_metric_collection_on_scrape = conf.force_metric_collection_on_scrape; + + let prometheus_metrics_handler_wrapper = + move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape); + // NB: on any changes do not forget to update the OpenAPI spec // located nearby (/safekeeper/src/http/openapi_spec.yaml). let auth = conf.http_auth.clone(); @@ -706,7 +711,9 @@ pub fn make_router( .data(conf) .data(global_timelines) .data(auth) - .get("/metrics", |r| request_span(r, prometheus_metrics_handler)) + .get("/metrics", move |r| { + request_span(r, prometheus_metrics_handler_wrapper) + }) .get("/profile/cpu", |r| request_span(r, profile_cpu_handler)) .get("/profile/heap", |r| request_span(r, profile_heap_handler)) .get("/v1/status", |r| request_span(r, status_handler)) diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index c461c071da..c0b5403ebf 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -134,6 +134,7 @@ pub struct SafeKeeperConf { pub ssl_ca_certs: Vec, pub use_https_safekeeper_api: bool, pub enable_tls_wal_service_api: bool, + pub force_metric_collection_on_scrape: bool, } impl SafeKeeperConf { @@ -183,6 +184,7 @@ impl SafeKeeperConf { ssl_ca_certs: Vec::new(), use_https_safekeeper_api: false, enable_tls_wal_service_api: false, + force_metric_collection_on_scrape: true, } } } diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 7e10847a1b..0e8dfd64c3 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -166,7 +166,7 @@ fn hadron_determine_offloader(mgr: &Manager, state: &StateSnapshot) -> (Option) -> Result<()> { ssl_ca_certs: Vec::new(), use_https_safekeeper_api: false, enable_tls_wal_service_api: false, + force_metric_collection_on_scrape: true, }; let mut global = GlobalMap::new(disk, conf.clone())?; diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index ed6643d641..9c1b81d261 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1984,11 +1984,14 @@ impl Service { }); // Check that there is enough safekeepers configured that we can create new timelines - let test_sk_res = this.safekeepers_for_new_timeline().await; + let test_sk_res_str = match this.safekeepers_for_new_timeline().await { + Ok(v) => format!("Ok({v:?})"), + Err(v) => format!("Err({v:})"), + }; tracing::info!( timeline_safekeeper_count = config.timeline_safekeeper_count, timelines_onto_safekeepers = config.timelines_onto_safekeepers, - "viability test result (test timeline creation on safekeepers): {test_sk_res:?}", + "viability test result (test timeline creation on safekeepers): {test_sk_res_str}", ); Ok(this) @@ -4428,7 +4431,7 @@ impl Service { .await; let mut failed = 0; - for (tid, result) in targeted_tenant_shards.iter().zip(results.into_iter()) { + for (tid, (_, result)) in targeted_tenant_shards.iter().zip(results.into_iter()) { match result { Ok(ok) => { if tid.is_shard_zero() { @@ -4758,6 +4761,7 @@ impl Service { ) .await; + let mut retry_if_not_attached = false; let targets = { let locked = self.inner.read().unwrap(); let mut targets = Vec::new(); @@ -4774,6 +4778,24 @@ impl Service { .expect("Pageservers may not be deleted while referenced"); targets.push((*tenant_shard_id, node.clone())); + + if let Some(location) = shard.observed.locations.get(node_id) { + if let Some(ref conf) = location.conf { + if conf.mode != LocationConfigMode::AttachedSingle + && conf.mode != LocationConfigMode::AttachedMulti + { + // If the shard is attached as secondary, we need to retry if 404. + retry_if_not_attached = true; + } + // If the shard is attached as primary, we should succeed. + } else { + // Location conf is not available yet, retry if 404. + retry_if_not_attached = true; + } + } else { + // The shard is not attached to the intended pageserver yet, retry if 404. + retry_if_not_attached = true; + } } } targets @@ -4795,7 +4817,7 @@ impl Service { .await; let mut valid_until = None; - for r in res { + for (node, r) in res { match r { Ok(lease) => { if let Some(ref mut valid_until) = valid_until { @@ -4804,8 +4826,20 @@ impl Service { valid_until = Some(lease.valid_until); } } + Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) + if retry_if_not_attached => + { + // This is expected if the attach is not finished yet. Return 503 so that the client can retry. + return Err(ApiError::ResourceUnavailable( + format!( + "Timeline is not attached to the pageserver {} yet, please retry", + node.get_id() + ) + .into(), + )); + } Err(e) => { - return Err(ApiError::InternalServerError(anyhow::anyhow!(e))); + return Err(passthrough_api_error(&node, e)); } } } @@ -4919,7 +4953,7 @@ impl Service { max_retries: u32, timeout: Duration, cancel: &CancellationToken, - ) -> Vec> + ) -> Vec<(Node, mgmt_api::Result)> where O: Fn(TenantShardId, PageserverClient) -> F + Copy, F: std::future::Future>, @@ -4940,16 +4974,16 @@ impl Service { cancel, ) .await; - (idx, r) + (idx, node, r) }); } - while let Some((idx, r)) = futs.next().await { - results.push((idx, r.unwrap_or(Err(mgmt_api::Error::Cancelled)))); + while let Some((idx, node, r)) = futs.next().await { + results.push((idx, node, r.unwrap_or(Err(mgmt_api::Error::Cancelled)))); } - results.sort_by_key(|(idx, _)| *idx); - results.into_iter().map(|(_, r)| r).collect() + results.sort_by_key(|(idx, _, _)| *idx); + results.into_iter().map(|(_, node, r)| (node, r)).collect() } /// Helper for safely working with the shards in a tenant remotely on pageservers, for example @@ -5862,7 +5896,7 @@ impl Service { return; } - for result in self + for (_, result) in self .tenant_for_shards_api( attached, |tenant_shard_id, client| async move { @@ -5881,7 +5915,7 @@ impl Service { } } - for result in self + for (_, result) in self .tenant_for_shards_api( secondary, |tenant_shard_id, client| async move { @@ -8768,7 +8802,7 @@ impl Service { ) .await; - for ((tenant_shard_id, node, optimization), secondary_status) in + for ((tenant_shard_id, node, optimization), (_, secondary_status)) in want_secondary_status.into_iter().zip(results.into_iter()) { match secondary_status { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index f87182846e..6fae0ee8c2 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1797,6 +1797,33 @@ def neon_env_builder( record_property("preserve_database_files", builder.preserve_database_files) +@pytest.fixture(scope="function") +def neon_env_builder_local( + neon_env_builder: NeonEnvBuilder, + test_output_dir: Path, + pg_distrib_dir: Path, +) -> NeonEnvBuilder: + """ + Fixture to create a Neon environment for test with its own pg_install copy. + + This allows the test to edit the list of available extensions in the + local instance of Postgres used for the test, and install extensions via + downloading them when a remote extension is tested, for instance, or + copying files around for local extension testing. + """ + test_local_pginstall = test_output_dir / "pg_install" + log.info(f"copy {pg_distrib_dir} to {test_local_pginstall}") + + # We can't copy only the version that we are currently testing because other + # binaries like the storage controller need specific Postgres versions. + shutil.copytree(pg_distrib_dir, test_local_pginstall) + + neon_env_builder.pg_distrib_dir = test_local_pginstall + log.info(f"local neon_env_builder.pg_distrib_dir: {neon_env_builder.pg_distrib_dir}") + + return neon_env_builder + + @dataclass class PageserverPort: pg: int diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 79cfba8da6..8e7d957b22 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -1002,7 +1002,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): def get_metrics_str(self) -> str: """You probably want to use get_metrics() instead.""" - res = self.get(f"http://localhost:{self.port}/metrics") + res = self.get(f"http://localhost:{self.port}/metrics?use_latest=true") self.verbose_error(res) return res.text diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index 942b620be6..ceb00c0f90 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -143,7 +143,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): def get_metrics_str(self) -> str: """You probably want to use get_metrics() instead.""" - request_result = self.get(f"http://localhost:{self.port}/metrics") + request_result = self.get(f"http://localhost:{self.port}/metrics?use_latest=true") request_result.raise_for_status() return request_result.text diff --git a/test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension--1.0.sql b/test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension--1.0.sql new file mode 100644 index 0000000000..2b82102802 --- /dev/null +++ b/test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension--1.0.sql @@ -0,0 +1,32 @@ +\echo Use "CREATE EXTENSION test_event_trigger_extension" to load this file. \quit + +CREATE SCHEMA event_trigger; + +create sequence if not exists event_trigger.seq_schema_version as int cycle; + +create or replace function event_trigger.increment_schema_version() + returns event_trigger + security definer + language plpgsql +as $$ +begin + perform pg_catalog.nextval('event_trigger.seq_schema_version'); +end; +$$; + +create or replace function event_trigger.get_schema_version() + returns int + security definer + language sql +as $$ + select last_value from event_trigger.seq_schema_version; +$$; + +-- On DDL event, increment the schema version number +create event trigger event_trigger_watch_ddl + on ddl_command_end + execute procedure event_trigger.increment_schema_version(); + +create event trigger event_trigger_watch_drop + on sql_drop + execute procedure event_trigger.increment_schema_version(); diff --git a/test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension.control b/test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension.control new file mode 100644 index 0000000000..4fe8c3341b --- /dev/null +++ b/test_runner/regress/data/test_event_trigger_extension/test_event_trigger_extension.control @@ -0,0 +1,8 @@ +default_version = '1.0' +comment = 'Test extension with Event Trigger' + +# make sure the extension objects are owned by the bootstrap user +# to check that the SECURITY DEFINER event trigger function is still +# called during non-superuser DDL events. +superuser = true +trusted = true diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 009df34990..54e6a5dbdf 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -165,6 +165,7 @@ def test_fully_custom_config(positive_env: NeonEnv): "gc_horizon": 23 * (1024 * 1024), "gc_period": "2h 13m", "image_creation_threshold": 7, + "image_layer_force_creation_period": "1m", "pitr_interval": "1m", "lagging_wal_timeout": "23m", "lazy_slru_download": True, diff --git a/test_runner/regress/test_branch_and_gc.py b/test_runner/regress/test_branch_and_gc.py index 8447c9bf2d..148f469a95 100644 --- a/test_runner/regress/test_branch_and_gc.py +++ b/test_runner/regress/test_branch_and_gc.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING import pytest from fixtures.common_types import Lsn, TimelineId from fixtures.log_helper import log +from fixtures.neon_fixtures import wait_for_last_flush_lsn from fixtures.pageserver.http import TimelineCreate406 from fixtures.utils import query_scalar, skip_in_debug_build @@ -162,6 +163,9 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv): ) lsn = Lsn(res[2][0][0]) + # Wait for all WAL to reach the pageserver, so GC cutoff LSN is greater than `lsn`. + wait_for_last_flush_lsn(env, endpoint0, tenant, b0) + # Use `failpoint=sleep` and `threading` to make the GC iteration triggers *before* the # branch creation task but the individual timeline GC iteration happens *after* # the branch creation task. diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 1570d40ae9..e67161c6b7 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -944,3 +944,78 @@ def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool f"SELECT count(*) FROM foo WHERE id={v} and val=repeat('abcde{v:0>3}', 500)" ) assert res[0][0] == 1 + + +# BEGIN_HADRON +def get_layer_map(env, tenant_shard_id, timeline_id, ps_id): + client = env.pageservers[ps_id].http_client() + layer_map = client.layer_map_info(tenant_shard_id, timeline_id) + image_layer_count = 0 + delta_layer_count = 0 + for layer in layer_map.historic_layers: + if layer.kind == "Image": + image_layer_count += 1 + elif layer.kind == "Delta": + delta_layer_count += 1 + return image_layer_count, delta_layer_count + + +def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder): + """ + Tests that page server can force creating new images if image creation timeout is enabled + """ + # use large knobs to disable L0 compaction/image creation except for the force image creation + tenant_conf = { + "compaction_threshold": "100", + "image_creation_threshold": "100", + "image_layer_creation_check_threshold": "1", + "checkpoint_distance": 10 * 1024, + "checkpoint_timeout": "1s", + "image_layer_force_creation_period": "1s", + # The lsn for forced image layer creations is calculated once every 10 minutes. + # Hence, drive compaction manually such that the test doesn't compute it at the + # wrong time. + "compaction_period": "0s", + } + + # consider every tenant large to run the image layer generation check more eagerly + neon_env_builder.pageserver_config_override = ( + "image_layer_generation_large_timeline_threshold=0" + ) + + neon_env_builder.num_pageservers = 1 + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + endpoint = env.endpoints.create_start("main") + endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") + # Generate some rows. + for v in range(10): + endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))") + + # Sleep a bit such that the inserts are considered when calculating the forced image layer creation LSN. + time.sleep(2) + + def check_force_image_creation(): + ps_http = env.pageserver.http_client() + ps_http.timeline_compact(tenant_id, timeline_id) + image, delta = get_layer_map(env, tenant_id, timeline_id, 0) + log.info(f"images: {image}, deltas: {delta}") + assert image > 0 + + env.pageserver.assert_log_contains("forcing L0 compaction of") + env.pageserver.assert_log_contains("forcing image creation for partitioned range") + + wait_until(check_force_image_creation) + + endpoint.stop_and_destroy() + + env.pageserver.allowed_errors.append( + ".*created delta file of size.*larger than double of target.*" + ) + + +# END_HADRON diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index fe3b220c67..d7f78afac8 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -2,7 +2,6 @@ from __future__ import annotations import os import platform -import shutil import tarfile from enum import StrEnum from pathlib import Path @@ -31,27 +30,6 @@ if TYPE_CHECKING: from werkzeug.wrappers.request import Request -# use neon_env_builder_local fixture to override the default neon_env_builder fixture -# and use a test-specific pg_install instead of shared one -@pytest.fixture(scope="function") -def neon_env_builder_local( - neon_env_builder: NeonEnvBuilder, - test_output_dir: Path, - pg_distrib_dir: Path, -) -> NeonEnvBuilder: - test_local_pginstall = test_output_dir / "pg_install" - log.info(f"copy {pg_distrib_dir} to {test_local_pginstall}") - - # We can't copy only the version that we are currently testing because other - # binaries like the storage controller need specific Postgres versions. - shutil.copytree(pg_distrib_dir, test_local_pginstall) - - neon_env_builder.pg_distrib_dir = test_local_pginstall - log.info(f"local neon_env_builder.pg_distrib_dir: {neon_env_builder.pg_distrib_dir}") - - return neon_env_builder - - @final class RemoteExtension(StrEnum): SQL_ONLY = "test_extension_sql_only" diff --git a/test_runner/regress/test_event_trigger_extension.py b/test_runner/regress/test_event_trigger_extension.py new file mode 100644 index 0000000000..ac4351dcd5 --- /dev/null +++ b/test_runner/regress/test_event_trigger_extension.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import shutil +from pathlib import Path +from typing import TYPE_CHECKING, cast + +import pytest +from fixtures.log_helper import log +from fixtures.paths import BASE_DIR + +if TYPE_CHECKING: + from pathlib import Path + + from fixtures.neon_fixtures import ( + NeonEnvBuilder, + ) + from fixtures.pg_version import PgVersion + + +# use neon_env_builder_local fixture to override the default neon_env_builder fixture +# and use a test-specific pg_install instead of shared one +@pytest.fixture(scope="function") +def neon_env_builder_event_trigger_extension( + neon_env_builder_local: NeonEnvBuilder, + test_output_dir: Path, + pg_version: PgVersion, +) -> NeonEnvBuilder: + test_local_pginstall = test_output_dir / "pg_install" + + # Now copy the SQL only extension test_event_trigger_extension in the local + # pginstall extension directory on-disk + test_event_trigger_extension_dir = ( + BASE_DIR / "test_runner" / "regress" / "data" / "test_event_trigger_extension" + ) + + test_local_extension_dir = ( + test_local_pginstall / f"v{pg_version}" / "share" / "postgresql" / "extension" + ) + + log.info(f"copy {test_event_trigger_extension_dir} to {test_local_extension_dir}") + + for f in [ + test_event_trigger_extension_dir / "test_event_trigger_extension.control", + test_event_trigger_extension_dir / "test_event_trigger_extension--1.0.sql", + ]: + shutil.copy(f, test_local_extension_dir) + + return neon_env_builder_local + + +def test_event_trigger_extension(neon_env_builder_event_trigger_extension: NeonEnvBuilder): + """ + Test installing an extension that contains an Event Trigger. + + The Event Trigger function is owned by the extension owner, which at + CREATE EXTENSION is going to be the Postgres bootstrap user, per the + extension control file where both superuser = true and trusted = true. + + Also this function is SECURTY DEFINER, to allow for making changes to + the extension SQL objects, in our case a sequence. + + This test makes sure that the event trigger function is fired correctly + by non-privileged user DDL actions such as CREATE TABLE. + """ + env = neon_env_builder_event_trigger_extension.init_start() + env.create_branch("test_event_trigger_extension") + + endpoint = env.endpoints.create_start("test_event_trigger_extension") + extension = "test_event_trigger_extension" + database = "test_event_trigger_extension" + + endpoint.safe_psql(f"CREATE DATABASE {database}") + endpoint.safe_psql(f"CREATE EXTENSION {extension}", dbname=database) + + # check that the extension is owned by the bootstrap superuser (cloud_admin) + pg_bootstrap_superuser_name = "cloud_admin" + with endpoint.connect(dbname=database) as pg_conn: + with pg_conn.cursor() as cur: + cur.execute( + f"select rolname from pg_roles r join pg_extension e on r.oid = e.extowner where extname = '{extension}'" + ) + owner = cast("tuple[str]", cur.fetchone())[0] + assert owner == pg_bootstrap_superuser_name, ( + f"extension {extension} is not owned by bootstrap user '{pg_bootstrap_superuser_name}'" + ) + + # test that the SQL-only Event Trigger (SECURITY DEFINER function) runs + # correctly now that the extension has been installed + # + # create table to trigger the event trigger, twice, check sequence count + with endpoint.connect(dbname=database) as pg_conn: + log.info("creating SQL objects (tables)") + with pg_conn.cursor() as cur: + cur.execute("CREATE TABLE foo1(id int primary key)") + cur.execute("CREATE TABLE foo2(id int)") + + cur.execute("SELECT event_trigger.get_schema_version()") + res = cast("tuple[int]", cur.fetchone()) + ver = res[0] + + log.info(f"schema version is now {ver}") + assert ver == 2, "schema version is not 2" diff --git a/test_runner/regress/test_lfc_prewarm.py b/test_runner/regress/test_lfc_prewarm.py index ae36bbda79..22e5bf576f 100644 --- a/test_runner/regress/test_lfc_prewarm.py +++ b/test_runner/regress/test_lfc_prewarm.py @@ -1,6 +1,7 @@ import random import threading from enum import StrEnum +from time import sleep from typing import Any import pytest @@ -24,18 +25,7 @@ OFFLOAD_LABEL = "compute_ctl_lfc_offloads_total" OFFLOAD_ERR_LABEL = "compute_ctl_lfc_offload_errors_total" METHOD_VALUES = [e for e in PrewarmMethod] METHOD_IDS = [e.value for e in PrewarmMethod] - - -def check_pinned_entries(cur: Cursor): - """ - Wait till none of LFC buffers are pinned - """ - - def none_pinned(): - cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_chunks_pinned'") - assert cur.fetchall()[0][0] == 0 - - wait_until(none_pinned) +AUTOOFFLOAD_INTERVAL_SECS = 2 def prom_parse(client: EndpointHttpClient) -> dict[str, float]: @@ -49,9 +39,18 @@ def prom_parse(client: EndpointHttpClient) -> dict[str, float]: def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) -> Any: + if method == PrewarmMethod.POSTGRES: + cur.execute("select get_local_cache_state()") + return cur.fetchall()[0][0] + if method == PrewarmMethod.AUTOPREWARM: + # With autoprewarm, we need to be sure LFC was offloaded after all writes + # finish, so we sleep. Otherwise we'll have less prewarmed pages than we want + sleep(AUTOOFFLOAD_INTERVAL_SECS) client.offload_lfc_wait() - elif method == PrewarmMethod.COMPUTE_CTL: + return + + if method == PrewarmMethod.COMPUTE_CTL: status = client.prewarm_lfc_status() assert status["status"] == "not_prewarmed" assert "error" not in status @@ -60,11 +59,9 @@ def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) parsed = prom_parse(client) desired = {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0, OFFLOAD_ERR_LABEL: 0, PREWARM_ERR_LABEL: 0} assert parsed == desired, f"{parsed=} != {desired=}" - elif method == PrewarmMethod.POSTGRES: - cur.execute("select get_local_cache_state()") - return cur.fetchall()[0][0] - else: - raise AssertionError(f"{method} not in PrewarmMethod") + return + + raise AssertionError(f"{method} not in PrewarmMethod") def prewarm_endpoint( @@ -106,14 +103,13 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): "neon.file_cache_size_limit=1GB", "neon.file_cache_prewarm_limit=1000", ] - offload_secs = 2 if method == PrewarmMethod.AUTOPREWARM: endpoint = env.endpoints.create_start( branch_name="main", config_lines=cfg, autoprewarm=True, - offload_lfc_interval_seconds=offload_secs, + offload_lfc_interval_seconds=AUTOOFFLOAD_INTERVAL_SECS, ) else: endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg) @@ -135,7 +131,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): endpoint.stop() if method == PrewarmMethod.AUTOPREWARM: - endpoint.start(autoprewarm=True, offload_lfc_interval_seconds=offload_secs) + endpoint.start(autoprewarm=True, offload_lfc_interval_seconds=AUTOOFFLOAD_INTERVAL_SECS) else: endpoint.start() @@ -162,7 +158,6 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod): lfc_cur.execute("select sum(pk) from t") assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2 - check_pinned_entries(pg_cur) desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped} check_prewarmed(method, client, desired) @@ -243,9 +238,9 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet prewarm_thread.start() def prewarmed(): - assert n_prewarms > 5 + assert n_prewarms > 3 - wait_until(prewarmed) + wait_until(prewarmed, timeout=40) # debug builds don't finish in 20s running = False for t in workload_threads: @@ -256,7 +251,6 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet total_balance = lfc_cur.fetchall()[0][0] assert total_balance == 0 - check_pinned_entries(pg_cur) if method == PrewarmMethod.POSTGRES: return desired = {