From 9b65946566fc4e4b095cacab56f1843e0679eda0 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 3 May 2024 16:45:24 +0100 Subject: [PATCH 1/5] proxy: add connect compute concurrency lock (#7607) ## Problem Too many connect_compute attempts can overwhelm postgres, getting the connections stuck. ## Summary of changes Limit number of connection attempts that can happen at a given time. --- proxy/src/bin/proxy.rs | 46 +++++++++++++++++++++--------- proxy/src/compute.rs | 11 +++++++ proxy/src/config.rs | 18 ++++++++---- proxy/src/console/provider.rs | 13 ++++----- proxy/src/console/provider/neon.rs | 8 +++--- proxy/src/lib.rs | 3 ++ proxy/src/metrics.rs | 9 ++++++ proxy/src/proxy.rs | 5 +++- proxy/src/proxy/connect_compute.rs | 8 +++++- proxy/src/serverless/backend.rs | 26 +++++++++++++++-- 10 files changed, 112 insertions(+), 35 deletions(-) diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 39f6bc8b6d..0956aae6c0 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -118,8 +118,11 @@ struct ProxyCliArgs { #[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)] wake_compute_cache: String, /// lock for `wake_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable). - #[clap(long, default_value = config::WakeComputeLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)] + #[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)] wake_compute_lock: String, + /// lock for `connect_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable). + #[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK)] + connect_compute_lock: String, /// Allow self-signed certificates for compute nodes (for testing) #[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] allow_self_signed_compute: bool, @@ -529,24 +532,21 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { endpoint_cache_config, ))); - let config::WakeComputeLockOptions { + let config::ConcurrencyLockOptions { shards, permits, epoch, timeout, } = args.wake_compute_lock.parse()?; info!(permits, shards, ?epoch, "Using NodeLocks (wake_compute)"); - let locks = Box::leak(Box::new( - console::locks::ApiLocks::new( - "wake_compute_lock", - permits, - shards, - timeout, - epoch, - &Metrics::get().wake_compute_lock, - ) - .unwrap(), - )); + let locks = Box::leak(Box::new(console::locks::ApiLocks::new( + "wake_compute_lock", + permits, + shards, + timeout, + epoch, + &Metrics::get().wake_compute_lock, + )?)); tokio::spawn(locks.garbage_collect_worker()); let url = args.auth_endpoint.parse()?; @@ -572,6 +572,23 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { auth::BackendType::Link(MaybeOwned::Owned(url), ()) } }; + + let config::ConcurrencyLockOptions { + shards, + permits, + epoch, + timeout, + } = args.connect_compute_lock.parse()?; + info!(permits, shards, ?epoch, "Using NodeLocks (connect_compute)"); + let connect_compute_locks = console::locks::ApiLocks::new( + "connect_compute_lock", + permits, + shards, + timeout, + epoch, + &Metrics::get().proxy.connect_compute_lock, + )?; + let http_config = HttpConfig { request_timeout: args.sql_over_http.sql_over_http_timeout, pool_options: GlobalConnPoolOptions { @@ -607,11 +624,14 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { region: args.region.clone(), aws_region: args.aws_region.clone(), wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?, + connect_compute_locks, connect_to_compute_retry_config: config::RetryConfig::parse( &args.connect_to_compute_retry, )?, })); + tokio::spawn(config.connect_compute_locks.garbage_collect_worker()); + Ok(config) } diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 44d85c2952..23266ac4ef 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -6,6 +6,7 @@ use crate::{ error::{ReportableError, UserFacingError}, metrics::{Metrics, NumDbConnectionsGuard}, proxy::neon_option, + Host, }; use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; @@ -101,6 +102,16 @@ impl ConnCfg { } } + pub fn get_host(&self) -> Result { + match self.0.get_hosts() { + [tokio_postgres::config::Host::Tcp(s)] => Ok(s.into()), + // we should not have multiple address or unix addresses. + _ => Err(WakeComputeError::BadComputeAddress( + "invalid compute address".into(), + )), + } + } + /// Apply startup message params to the connection config. pub fn set_startup_params(&mut self, params: &StartupMessageParams) { // Only set `user` if it's not present in the config. diff --git a/proxy/src/config.rs b/proxy/src/config.rs index a32ab8c43c..0c8e284d0b 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -1,7 +1,9 @@ use crate::{ auth::{self, backend::AuthRateLimiter}, + console::locks::ApiLocks, rate_limiter::RateBucketInfo, serverless::GlobalConnPoolOptions, + Host, }; use anyhow::{bail, ensure, Context, Ok}; use itertools::Itertools; @@ -34,6 +36,7 @@ pub struct ProxyConfig { pub handshake_timeout: Duration, pub aws_region: String, pub wake_compute_retry_config: RetryConfig, + pub connect_compute_locks: ApiLocks, pub connect_to_compute_retry_config: RetryConfig, } @@ -573,7 +576,7 @@ impl RetryConfig { } /// Helper for cmdline cache options parsing. -pub struct WakeComputeLockOptions { +pub struct ConcurrencyLockOptions { /// The number of shards the lock map should have pub shards: usize, /// The number of allowed concurrent requests for each endpoitn @@ -584,9 +587,12 @@ pub struct WakeComputeLockOptions { pub timeout: Duration, } -impl WakeComputeLockOptions { +impl ConcurrencyLockOptions { /// Default options for [`crate::console::provider::ApiLocks`]. pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0"; + /// Default options for [`crate::console::provider::ApiLocks`]. + pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str = + "shards=64,permits=50,epoch=10m,timeout=500ms"; // pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s"; @@ -636,7 +642,7 @@ impl WakeComputeLockOptions { } } -impl FromStr for WakeComputeLockOptions { +impl FromStr for ConcurrencyLockOptions { type Err = anyhow::Error; fn from_str(options: &str) -> Result { @@ -672,7 +678,7 @@ mod tests { #[test] fn test_parse_lock_options() -> anyhow::Result<()> { - let WakeComputeLockOptions { + let ConcurrencyLockOptions { epoch, permits, shards, @@ -683,7 +689,7 @@ mod tests { assert_eq!(shards, 32); assert_eq!(permits, 4); - let WakeComputeLockOptions { + let ConcurrencyLockOptions { epoch, permits, shards, @@ -694,7 +700,7 @@ mod tests { assert_eq!(shards, 16); assert_eq!(permits, 8); - let WakeComputeLockOptions { + let ConcurrencyLockOptions { epoch, permits, shards, diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index aa1800a9da..dfda29e0b1 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -17,7 +17,7 @@ use crate::{ scram, EndpointCacheKey, }; use dashmap::DashMap; -use std::{sync::Arc, time::Duration}; +use std::{hash::Hash, sync::Arc, time::Duration}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; use tracing::info; @@ -447,16 +447,16 @@ impl ApiCaches { } /// Various caches for [`console`](super). -pub struct ApiLocks { +pub struct ApiLocks { name: &'static str, - node_locks: DashMap>, + node_locks: DashMap>, permits: usize, timeout: Duration, epoch: std::time::Duration, metrics: &'static ApiLockMetrics, } -impl ApiLocks { +impl ApiLocks { pub fn new( name: &'static str, permits: usize, @@ -475,10 +475,7 @@ impl ApiLocks { }) } - pub async fn get_wake_compute_permit( - &self, - key: &EndpointCacheKey, - ) -> Result { + pub async fn get_permit(&self, key: &K) -> Result { if self.permits == 0 { return Ok(WakeComputePermit { permit: None }); } diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index 58b2a1570c..ec66641d01 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -13,7 +13,7 @@ use crate::{ http, metrics::{CacheOutcome, Metrics}, rate_limiter::EndpointRateLimiter, - scram, Normalize, + scram, EndpointCacheKey, Normalize, }; use crate::{cache::Cached, context::RequestMonitoring}; use futures::TryFutureExt; @@ -25,7 +25,7 @@ use tracing::{error, info, info_span, warn, Instrument}; pub struct Api { endpoint: http::Endpoint, pub caches: &'static ApiCaches, - pub locks: &'static ApiLocks, + pub locks: &'static ApiLocks, pub endpoint_rate_limiter: Arc, jwt: String, } @@ -35,7 +35,7 @@ impl Api { pub fn new( endpoint: http::Endpoint, caches: &'static ApiCaches, - locks: &'static ApiLocks, + locks: &'static ApiLocks, endpoint_rate_limiter: Arc, ) -> Self { let jwt: String = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") { @@ -289,7 +289,7 @@ impl super::Api for Api { return Err(WakeComputeError::TooManyConnections); } - let permit = self.locks.get_wake_compute_permit(&key).await?; + let permit = self.locks.get_permit(&key).await?; // after getting back a permit - it's possible the cache was filled // double check diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 3f6d985fe8..35c1616481 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -159,6 +159,9 @@ smol_str_wrapper!(EndpointCacheKey); smol_str_wrapper!(DbName); +// postgres hostname, will likely be a port:ip addr +smol_str_wrapper!(Host); + // Endpoints are a bit tricky. Rare they might be branches or projects. impl EndpointId { pub fn is_endpoint(&self) -> bool { diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 4a54857012..1590316925 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -126,6 +126,9 @@ pub struct ProxyMetrics { /// Number of events consumed from redis (per event type). pub redis_events_count: CounterVec>, + + #[metric(namespace = "connect_compute_lock")] + pub connect_compute_lock: ApiLockMetrics, } #[derive(MetricGroup)] @@ -149,6 +152,12 @@ impl Default for ProxyMetrics { } } +impl Default for ApiLockMetrics { + fn default() -> Self { + Self::new() + } +} + #[derive(FixedCardinalityLabel, Copy, Clone)] #[label(singleton = "direction")] pub enum HttpDirection { diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 33d73eb675..e4e095d77d 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -301,7 +301,10 @@ pub async fn handle_client( let mut node = connect_to_compute( ctx, - &TcpMechanism { params: ¶ms }, + &TcpMechanism { + params: ¶ms, + locks: &config.connect_compute_locks, + }, &user_info, mode.allow_self_signed_compute(config), config.wake_compute_retry_config, diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index da6223209f..c8528d0296 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -2,7 +2,7 @@ use crate::{ auth::backend::ComputeCredentialKeys, compute::{self, PostgresConnection}, config::RetryConfig, - console::{self, errors::WakeComputeError, CachedNodeInfo, NodeInfo}, + console::{self, errors::WakeComputeError, locks::ApiLocks, CachedNodeInfo, NodeInfo}, context::RequestMonitoring, error::ReportableError, metrics::{ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType}, @@ -10,6 +10,7 @@ use crate::{ retry::{retry_after, ShouldRetry}, wake_compute::wake_compute, }, + Host, }; use async_trait::async_trait; use pq_proto::StartupMessageParams; @@ -64,6 +65,9 @@ pub trait ComputeConnectBackend { pub struct TcpMechanism<'a> { /// KV-dictionary with PostgreSQL connection params. pub params: &'a StartupMessageParams, + + /// connect_to_compute concurrency lock + pub locks: &'static ApiLocks, } #[async_trait] @@ -79,6 +83,8 @@ impl ConnectMechanism for TcpMechanism<'_> { node_info: &console::CachedNodeInfo, timeout: time::Duration, ) -> Result { + let host = node_info.config.get_host()?; + let _permit = self.locks.get_permit(&host).await?; node_info.connect(ctx, timeout).await } diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index c89ebc3251..963913a260 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -9,11 +9,13 @@ use crate::{ config::{AuthenticationConfig, ProxyConfig}, console::{ errors::{GetAuthInfoError, WakeComputeError}, + locks::ApiLocks, CachedNodeInfo, }, context::RequestMonitoring, error::{ErrorKind, ReportableError, UserFacingError}, - proxy::connect_compute::ConnectMechanism, + proxy::{connect_compute::ConnectMechanism, retry::ShouldRetry}, + Host, }; use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool}; @@ -105,6 +107,7 @@ impl PoolingBackend { conn_id, conn_info, pool: self.pool.clone(), + locks: &self.config.connect_compute_locks, }, &backend, false, // do not allow self signed compute for http flow @@ -154,16 +157,31 @@ impl UserFacingError for HttpConnError { } } +impl ShouldRetry for HttpConnError { + fn could_retry(&self) -> bool { + match self { + HttpConnError::ConnectionError(e) => e.could_retry(), + HttpConnError::ConnectionClosedAbruptly(_) => false, + HttpConnError::GetAuthInfo(_) => false, + HttpConnError::AuthError(_) => false, + HttpConnError::WakeCompute(_) => false, + } + } +} + struct TokioMechanism { pool: Arc>, conn_info: ConnInfo, conn_id: uuid::Uuid, + + /// connect_to_compute concurrency lock + locks: &'static ApiLocks, } #[async_trait] impl ConnectMechanism for TokioMechanism { type Connection = Client; - type ConnectError = tokio_postgres::Error; + type ConnectError = HttpConnError; type Error = HttpConnError; async fn connect_once( @@ -172,6 +190,9 @@ impl ConnectMechanism for TokioMechanism { node_info: &CachedNodeInfo, timeout: Duration, ) -> Result { + let host = node_info.config.get_host()?; + let permit = self.locks.get_permit(&host).await?; + let mut config = (*node_info.config).clone(); let config = config .user(&self.conn_info.user_info.user) @@ -182,6 +203,7 @@ impl ConnectMechanism for TokioMechanism { let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute); let (client, connection) = config.connect(tokio_postgres::NoTls).await?; drop(pause); + drop(permit); tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id())); Ok(poll_client( From ef03b38e5282140a5b7003c7f5010e1707631f31 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 3 May 2024 12:01:33 -0400 Subject: [PATCH 2/5] fix(pageserver): remove update_gc_info calls in tests (#7608) introduced by https://github.com/neondatabase/neon/pull/7468 conflicting with https://github.com/neondatabase/neon/pull/7584 Signed-off-by: Alex Chi Z --- pageserver/src/tenant.rs | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c39c21c6dd..fdc49ae295 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5504,6 +5504,8 @@ mod tests { const NUM_KEYS: usize = 1000; const STEP: usize = 100; // random update + scan base_key + idx * STEP + let cancel = CancellationToken::new(); + let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap(); base_key.field1 = AUX_KEY_PREFIX; let mut test_key = base_key; @@ -5585,21 +5587,11 @@ mod tests { } // Perform a cycle of flush, compact, and GC - let cutoff = tline.get_last_record_lsn(); - tline - .update_gc_info( - Vec::new(), - cutoff, - Duration::ZERO, - &CancellationToken::new(), - &ctx, - ) - .await?; tline.freeze_and_flush().await?; - tline - .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) + tline.compact(&cancel, EnumSet::empty(), &ctx).await?; + tenant + .gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx) .await?; - tline.gc().await?; } Ok(()) From 1e7cd6ac9f3568ffe9db952cb89f8036330d27b5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 3 May 2024 19:15:38 +0200 Subject: [PATCH 3/5] refactor: move `NodeMetadata` to `pageserver_api`; use it from `neon_local` (#7606) This is the first step towards representing all of Pageserver configuration as clean `serde::Serialize`able Rust structs in `pageserver_api`. The `neon_local` code will then use those structs instead of the crude `toml_edit` / string concatenation that it does today. refs https://github.com/neondatabase/neon/issues/7555 --------- Co-authored-by: Alex Chi Z --- control_plane/src/bin/neon_local.rs | 8 +++---- control_plane/src/pageserver.rs | 13 ++++++----- libs/pageserver_api/src/config.rs | 31 +++++++++++++++++++++++++ libs/pageserver_api/src/config/tests.rs | 22 ++++++++++++++++++ libs/pageserver_api/src/lib.rs | 6 +---- pageserver/src/config.rs | 24 ++----------------- pageserver/src/control_plane_client.rs | 6 ++--- 7 files changed, 69 insertions(+), 41 deletions(-) create mode 100644 libs/pageserver_api/src/config.rs create mode 100644 libs/pageserver_api/src/config/tests.rs diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index bdd64c8687..14b83c1252 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -14,15 +14,15 @@ use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR}; use control_plane::safekeeper::SafekeeperNode; use control_plane::storage_controller::StorageController; use control_plane::{broker, local_env}; +use pageserver_api::config::{ + DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, + DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT, +}; use pageserver_api::controller_api::PlacementPolicy; use pageserver_api::models::{ ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo, }; use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId}; -use pageserver_api::{ - DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, - DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT, -}; use postgres_backend::AuthType; use postgres_connection::parse_host_port; use safekeeper_api::{ diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 52accc5890..1a64391306 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -248,12 +248,13 @@ impl PageServerNode { // situation: the metadata is written by some other script. std::fs::write( metadata_path, - serde_json::to_vec(&serde_json::json!({ - "host": "localhost", - "port": self.pg_connection_config.port(), - "http_host": "localhost", - "http_port": http_port, - })) + serde_json::to_vec(&pageserver_api::config::NodeMetadata { + postgres_host: "localhost".to_string(), + postgres_port: self.pg_connection_config.port(), + http_host: "localhost".to_string(), + http_port, + other: HashMap::new(), + }) .unwrap(), ) .expect("Failed to write metadata file"); diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs new file mode 100644 index 0000000000..d996a62349 --- /dev/null +++ b/libs/pageserver_api/src/config.rs @@ -0,0 +1,31 @@ +use std::collections::HashMap; + +use const_format::formatcp; + +#[cfg(test)] +mod tests; + +pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000; +pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); +pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898; +pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}"); + +// Certain metadata (e.g. externally-addressable name, AZ) is delivered +// as a separate structure. This information is not neeed by the pageserver +// itself, it is only used for registering the pageserver with the control +// plane and/or storage controller. +// +#[derive(PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)] +pub struct NodeMetadata { + #[serde(rename = "host")] + pub postgres_host: String, + #[serde(rename = "port")] + pub postgres_port: u16, + pub http_host: String, + pub http_port: u16, + + // Deployment tools may write fields to the metadata file beyond what we + // use in this type: this type intentionally only names fields that require. + #[serde(flatten)] + pub other: HashMap, +} diff --git a/libs/pageserver_api/src/config/tests.rs b/libs/pageserver_api/src/config/tests.rs new file mode 100644 index 0000000000..edeefc156e --- /dev/null +++ b/libs/pageserver_api/src/config/tests.rs @@ -0,0 +1,22 @@ +use super::*; + +#[test] +fn test_node_metadata_v1_backward_compatibilty() { + let v1 = serde_json::to_vec(&serde_json::json!({ + "host": "localhost", + "port": 23, + "http_host": "localhost", + "http_port": 42, + })); + + assert_eq!( + serde_json::from_slice::(&v1.unwrap()).unwrap(), + NodeMetadata { + postgres_host: "localhost".to_string(), + postgres_port: 23, + http_host: "localhost".to_string(), + http_port: 42, + other: HashMap::new(), + } + ) +} diff --git a/libs/pageserver_api/src/lib.rs b/libs/pageserver_api/src/lib.rs index 1b948d60c3..532185a366 100644 --- a/libs/pageserver_api/src/lib.rs +++ b/libs/pageserver_api/src/lib.rs @@ -1,6 +1,5 @@ #![deny(unsafe_code)] #![deny(clippy::undocumented_unsafe_blocks)] -use const_format::formatcp; pub mod controller_api; pub mod key; @@ -11,7 +10,4 @@ pub mod shard; /// Public API types pub mod upcall_api; -pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000; -pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); -pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898; -pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}"); +pub mod config; diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 96fff1f0c0..258eed0b12 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -9,7 +9,7 @@ use pageserver_api::shard::TenantShardId; use remote_storage::{RemotePath, RemoteStorageConfig}; use serde; use serde::de::IntoDeserializer; -use std::{collections::HashMap, env}; +use std::env; use storage_broker::Uri; use utils::crashsafe::path_with_suffix_extension; use utils::id::ConnectionId; @@ -51,7 +51,7 @@ pub mod defaults { use crate::tenant::config::defaults::*; use const_format::formatcp; - pub use pageserver_api::{ + pub use pageserver_api::config::{ DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_PG_LISTEN_PORT, }; @@ -335,26 +335,6 @@ impl BuilderValue { } } -// Certain metadata (e.g. externally-addressable name, AZ) is delivered -// as a separate structure. This information is not neeed by the pageserver -// itself, it is only used for registering the pageserver with the control -// plane and/or storage controller. -// -#[derive(serde::Deserialize)] -pub(crate) struct NodeMetadata { - #[serde(rename = "host")] - pub(crate) postgres_host: String, - #[serde(rename = "port")] - pub(crate) postgres_port: u16, - pub(crate) http_host: String, - pub(crate) http_port: u16, - - // Deployment tools may write fields to the metadata file beyond what we - // use in this type: this type intentionally only names fields that require. - #[serde(flatten)] - pub(crate) other: HashMap, -} - // needed to simplify config construction #[derive(Default)] struct PageServerConfigBuilder { diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index db0032891e..26e7cc7ef8 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -14,10 +14,8 @@ use tokio_util::sync::CancellationToken; use url::Url; use utils::{backoff, failpoint_support, generation::Generation, id::NodeId}; -use crate::{ - config::{NodeMetadata, PageServerConf}, - virtual_file::on_fatal_io_error, -}; +use crate::{config::PageServerConf, virtual_file::on_fatal_io_error}; +use pageserver_api::config::NodeMetadata; /// The Pageserver's client for using the control plane API: this is a small subset /// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md) From 64f0613edf47a6975f5d6394e5056cf2eaf7e484 Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Fri, 3 May 2024 12:57:45 -0700 Subject: [PATCH 4/5] compute_ctl: Add support for swap resizing (#7434) Part of neondatabase/cloud#12047. Resolves #7239. In short, this PR: 1. Adds `ComputeSpec.swap_size_bytes: Option` 2. Adds a flag to compute_ctl: `--resize-swap-on-bind` 3. Implements running `/neonvm/bin/resize-swap` with the value from the compute spec before starting postgres, if both the value in the spec *AND* the flag are specified. 4. Adds `sudo` to the final image 5. Adds a file in `/etc/sudoers.d` to allow `compute_ctl` to resize swap Various bits of reasoning about design decisions in the added comments. In short: We have both a compute spec field and a flag to make rollout easier to implement. The flag will most likely be removed as part of cleanups for neondatabase/cloud#12047. --- compute_tools/src/bin/compute_ctl.rs | 86 +++++++++++++++++++++------- compute_tools/src/lib.rs | 1 + compute_tools/src/swap.rs | 36 ++++++++++++ control_plane/src/endpoint.rs | 1 + libs/compute_api/src/spec.rs | 17 ++++++ vm-image-spec.yaml | 22 +++++++ 6 files changed, 143 insertions(+), 20 deletions(-) create mode 100644 compute_tools/src/swap.rs diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 117919786e..471d46d4f2 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -47,7 +47,7 @@ use chrono::Utc; use clap::Arg; use signal_hook::consts::{SIGQUIT, SIGTERM}; use signal_hook::{consts::SIGINT, iterator::Signals}; -use tracing::{error, info}; +use tracing::{error, info, warn}; use url::Url; use compute_api::responses::ComputeStatus; @@ -62,6 +62,7 @@ use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; use compute_tools::params::*; use compute_tools::spec::*; +use compute_tools::swap::resize_swap; // this is an arbitrary build tag. Fine as a default / for testing purposes // in-case of not-set environment var @@ -110,6 +111,7 @@ fn main() -> Result<()> { .expect("Postgres connection string is required"); let spec_json = matches.get_one::("spec"); let spec_path = matches.get_one::("spec-path"); + let resize_swap_on_bind = matches.get_flag("resize-swap-on-bind"); // Extract OpenTelemetry context for the startup actions from the // TRACEPARENT and TRACESTATE env variables, and attach it to the current @@ -275,33 +277,72 @@ fn main() -> Result<()> { "running compute with features: {:?}", state.pspec.as_ref().unwrap().spec.features ); + // before we release the mutex, fetch the swap size (if any) for later. + let swap_size_bytes = state.pspec.as_ref().unwrap().spec.swap_size_bytes; drop(state); // Launch remaining service threads let _monitor_handle = launch_monitor(&compute); let _configurator_handle = launch_configurator(&compute); - // Start Postgres + let mut prestartup_failed = false; let mut delay_exit = false; - let mut exit_code = None; - let pg = match compute.start_compute(extension_server_port) { - Ok(pg) => Some(pg), - Err(err) => { - error!("could not start the compute node: {:#}", err); - let mut state = compute.state.lock().unwrap(); - state.error = Some(format!("{:?}", err)); - state.status = ComputeStatus::Failed; - // Notify others that Postgres failed to start. In case of configuring the - // empty compute, it's likely that API handler is still waiting for compute - // state change. With this we will notify it that compute is in Failed state, - // so control plane will know about it earlier and record proper error instead - // of timeout. - compute.state_changed.notify_all(); - drop(state); // unlock - delay_exit = true; - None + + // Resize swap to the desired size if the compute spec says so + if let (Some(size_bytes), true) = (swap_size_bytes, resize_swap_on_bind) { + // To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion + // *before* starting postgres. + // + // In theory, we could do this asynchronously if SkipSwapon was enabled for VMs, but this + // carries a risk of introducing hard-to-debug issues - e.g. if postgres sometimes gets + // OOM-killed during startup because swap wasn't available yet. + match resize_swap(size_bytes) { + Ok(()) => { + let size_gib = size_bytes as f32 / (1 << 20) as f32; // just for more coherent display. + info!(%size_bytes, %size_gib, "resized swap"); + } + Err(err) => { + let err = err.context("failed to resize swap"); + error!("{err:#}"); + + // Mark compute startup as failed; don't try to start postgres, and report this + // error to the control plane when it next asks. + prestartup_failed = true; + let mut state = compute.state.lock().unwrap(); + state.error = Some(format!("{err:?}")); + state.status = ComputeStatus::Failed; + compute.state_changed.notify_all(); + delay_exit = true; + } } - }; + } + + // Start Postgres + let mut pg = None; + let mut exit_code = None; + + if !prestartup_failed { + pg = match compute.start_compute(extension_server_port) { + Ok(pg) => Some(pg), + Err(err) => { + error!("could not start the compute node: {:#}", err); + let mut state = compute.state.lock().unwrap(); + state.error = Some(format!("{:?}", err)); + state.status = ComputeStatus::Failed; + // Notify others that Postgres failed to start. In case of configuring the + // empty compute, it's likely that API handler is still waiting for compute + // state change. With this we will notify it that compute is in Failed state, + // so control plane will know about it earlier and record proper error instead + // of timeout. + compute.state_changed.notify_all(); + drop(state); // unlock + delay_exit = true; + None + } + }; + } else { + warn!("skipping postgres startup because pre-startup step failed"); + } // Start the vm-monitor if directed to. The vm-monitor only runs on linux // because it requires cgroups. @@ -526,6 +567,11 @@ fn cli() -> clap::Command { ) .value_name("FILECACHE_CONNSTR"), ) + .arg( + Arg::new("resize-swap-on-bind") + .long("resize-swap-on-bind") + .action(clap::ArgAction::SetTrue), + ) } /// When compute_ctl is killed, send also termination signal to sync-safekeepers diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 4e01ffd954..eac808385c 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -14,4 +14,5 @@ pub mod monitor; pub mod params; pub mod pg_helpers; pub mod spec; +pub mod swap; pub mod sync_sk; diff --git a/compute_tools/src/swap.rs b/compute_tools/src/swap.rs new file mode 100644 index 0000000000..c22b6bc14e --- /dev/null +++ b/compute_tools/src/swap.rs @@ -0,0 +1,36 @@ +use anyhow::{anyhow, Context}; +use tracing::warn; + +pub const RESIZE_SWAP_BIN: &str = "/neonvm/bin/resize-swap"; + +pub fn resize_swap(size_bytes: u64) -> anyhow::Result<()> { + // run `/neonvm/bin/resize-swap --once {size_bytes}` + // + // Passing '--once' causes resize-swap to delete itself after successful completion, which + // means that if compute_ctl restarts later, we won't end up calling 'swapoff' while + // postgres is running. + // + // NOTE: resize-swap is not very clever. If present, --once MUST be the first arg. + let child_result = std::process::Command::new("/usr/bin/sudo") + .arg(RESIZE_SWAP_BIN) + .arg("--once") + .arg(size_bytes.to_string()) + .spawn(); + + if matches!(&child_result, Err(e) if e.kind() == std::io::ErrorKind::NotFound) { + warn!("ignoring \"not found\" error from resize-swap to avoid swapoff while compute is running"); + return Ok(()); + } + + child_result + .context("spawn() failed") + .and_then(|mut child| child.wait().context("wait() failed")) + .and_then(|status| match status.success() { + true => Ok(()), + false => Err(anyhow!("process exited with {status}")), + }) + // wrap any prior error with the overall context that we couldn't run the command + .with_context(|| { + format!("could not run `/usr/bin/sudo {RESIZE_SWAP_BIN} --once {size_bytes}`") + }) +} diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 03f7db99fb..20371e1cb8 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -554,6 +554,7 @@ impl Endpoint { format_version: 1.0, operation_uuid: None, features: self.features.clone(), + swap_size_bytes: None, cluster: Cluster { cluster_id: None, // project ID: not used name: None, // project name: not used diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 71ae66c45c..1c4ee2089f 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -33,6 +33,23 @@ pub struct ComputeSpec { #[serde(default)] pub features: Vec, + /// If compute_ctl was passed `--resize-swap-on-bind`, a value of `Some(_)` instructs + /// compute_ctl to `/neonvm/bin/resize-swap` with the given size, when the spec is first + /// received. + /// + /// Both this field and `--resize-swap-on-bind` are required, so that the control plane's + /// spec generation doesn't need to be aware of the actual compute it's running on, while + /// guaranteeing gradual rollout of swap. Otherwise, without `--resize-swap-on-bind`, we could + /// end up trying to resize swap in VMs without it -- or end up *not* resizing swap, thus + /// giving every VM much more swap than it should have (32GiB). + /// + /// Eventually we may remove `--resize-swap-on-bind` and exclusively use `swap_size_bytes` for + /// enabling the swap resizing behavior once rollout is complete. + /// + /// See neondatabase/cloud#12047 for more. + #[serde(default)] + pub swap_size_bytes: Option, + /// Expected cluster state at the end of transition process. pub cluster: Cluster, pub delta_operations: Option>, diff --git a/vm-image-spec.yaml b/vm-image-spec.yaml index 3ccdf5cc64..41ca16f16b 100644 --- a/vm-image-spec.yaml +++ b/vm-image-spec.yaml @@ -5,6 +5,12 @@ commands: user: root sysvInitAction: sysinit shell: 'cgconfigparser -l /etc/cgconfig.conf -s 1664' + # restrict permissions on /neonvm/bin/resize-swap, because we grant access to compute_ctl for + # running it as root. + - name: chmod-resize-swap + user: root + sysvInitAction: sysinit + shell: 'chmod 711 /neonvm/bin/resize-swap' - name: pgbouncer user: postgres sysvInitAction: respawn @@ -24,6 +30,11 @@ commands: shutdownHook: | su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10' files: + - filename: compute_ctl-resize-swap + content: | + # Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap + # as root without requiring entering a password (NOPASSWD), regardless of hostname (ALL) + postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap - filename: pgbouncer.ini content: | [databases] @@ -353,6 +364,17 @@ merge: | && echo 'root - nofile 1048576' >>/etc/security/limits.conf \ ) + # Allow postgres user (compute_ctl) to run swap resizer. + # Need to install sudo in order to allow this. + # + # Also, remove the 'read' permission from group/other on /neonvm/bin/resize-swap, just to be safe. + RUN set -e \ + && apt update \ + && apt install --no-install-recommends -y \ + sudo \ + && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* + COPY compute_ctl-resize-swap /etc/sudoers.d/compute_ctl-resize-swap + COPY cgconfig.conf /etc/cgconfig.conf COPY pgbouncer.ini /etc/pgbouncer.ini COPY sql_exporter.yml /etc/sql_exporter.yml From 4deb8dc52ec26ab59a4d0b7ff548ef389e6717f9 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 4 May 2024 08:44:18 +0300 Subject: [PATCH 5/5] compute_ctl: Be more precise in how startup time is calculated (#7601) - On a non-pooled start, do not reset the 'start_time' after launching the HTTP service. In a non-pooled start, it's fair to include that in the total startup time. - When setting wait_for_spec_ms and resetting start_time, call Utc::now() only once. It's a waste of cycles to call it twice, but also, it ensures the time between setting wait_for_spec_ms and resetting start_time is included in one or the other time period. These differences should be insignificant in practice, in the microsecond range, but IMHO it seems more logical and readable this way too. Also fix and clarify some of the surrounding comments. (This caught my eye while reviewing PR #7577) --- compute_tools/src/bin/compute_ctl.rs | 29 ++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 471d46d4f2..67c5250376 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -228,14 +228,14 @@ fn main() -> Result<()> { // If this is a pooled VM, prewarm before starting HTTP server and becoming // available for binding. Prewarming helps Postgres start quicker later, - // because QEMU will already have it's memory allocated from the host, and + // because QEMU will already have its memory allocated from the host, and // the necessary binaries will already be cached. if !spec_set { compute.prewarm_postgres()?; } - // Launch http service first, so we were able to serve control-plane - // requests, while configuration is still in progress. + // Launch http service first, so that we can serve control-plane requests + // while configuration is still in progress. let _http_handle = launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread"); @@ -255,21 +255,22 @@ fn main() -> Result<()> { break; } } + + // Record for how long we slept waiting for the spec. + let now = Utc::now(); + state.metrics.wait_for_spec_ms = now + .signed_duration_since(state.start_time) + .to_std() + .unwrap() + .as_millis() as u64; + + // Reset start time, so that the total startup time that is calculated later will + // not include the time that we waited for the spec. + state.start_time = now; } // We got all we need, update the state. let mut state = compute.state.lock().unwrap(); - - // Record for how long we slept waiting for the spec. - state.metrics.wait_for_spec_ms = Utc::now() - .signed_duration_since(state.start_time) - .to_std() - .unwrap() - .as_millis() as u64; - // Reset start time to the actual start of the configuration, so that - // total startup time was properly measured at the end. - state.start_time = Utc::now(); - state.status = ComputeStatus::Init; compute.state_changed.notify_all();