From 7c7180a79dbda2764d883392a73950acf114b63f Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Thu, 20 Feb 2025 17:14:16 +0000 Subject: [PATCH 01/11] Fix deadlock in drop_subscriptions_before_start (#10806) ALTER SUBSCRIPTION requires AccessExclusive lock which conflicts with iteration over pg_subscription when multiple databases are present and operations are applied concurrently. Fix by explicitly locking pg_subscription in the beginning of the transaction in each database. ## Problem https://github.com/neondatabase/cloud/issues/24292 --- compute_tools/src/sql/drop_subscriptions.sql | 1 + control_plane/src/endpoint.rs | 46 ++++- libs/compute_api/src/spec.rs | 8 +- .../regress/test_subscriber_branching.py | 173 +++++++++++++++++- 4 files changed, 220 insertions(+), 8 deletions(-) diff --git a/compute_tools/src/sql/drop_subscriptions.sql b/compute_tools/src/sql/drop_subscriptions.sql index dfb925e48e..03e8e158fa 100644 --- a/compute_tools/src/sql/drop_subscriptions.sql +++ b/compute_tools/src/sql/drop_subscriptions.sql @@ -2,6 +2,7 @@ DO $$ DECLARE subname TEXT; BEGIN + LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE; FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname); EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname); diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index c3c8229c38..c16b3cb017 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -59,6 +59,7 @@ use nix::sys::signal::Signal; use pageserver_api::shard::ShardStripeSize; use reqwest::header::CONTENT_TYPE; use serde::{Deserialize, Serialize}; +use tracing::debug; use url::Host; use utils::id::{NodeId, TenantId, TimelineId}; @@ -81,8 +82,10 @@ pub struct EndpointConf { internal_http_port: u16, pg_version: u32, skip_pg_catalog_updates: bool, + reconfigure_concurrency: usize, drop_subscriptions_before_start: bool, features: Vec, + cluster: Option, } // @@ -179,7 +182,9 @@ impl ComputeControlPlane { // we also skip catalog updates in the cloud. skip_pg_catalog_updates, drop_subscriptions_before_start, + reconfigure_concurrency: 1, features: vec![], + cluster: None, }); ep.create_endpoint_dir()?; @@ -196,7 +201,9 @@ impl ComputeControlPlane { pg_version, skip_pg_catalog_updates, drop_subscriptions_before_start, + reconfigure_concurrency: 1, features: vec![], + cluster: None, })?, )?; std::fs::write( @@ -261,8 +268,11 @@ pub struct Endpoint { skip_pg_catalog_updates: bool, drop_subscriptions_before_start: bool, + reconfigure_concurrency: usize, // Feature flags features: Vec, + // Cluster settings + cluster: Option, } #[derive(PartialEq, Eq)] @@ -302,6 +312,8 @@ impl Endpoint { let conf: EndpointConf = serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?; + debug!("serialized endpoint conf: {:?}", conf); + Ok(Endpoint { pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port), external_http_address: SocketAddr::new( @@ -319,8 +331,10 @@ impl Endpoint { tenant_id: conf.tenant_id, pg_version: conf.pg_version, skip_pg_catalog_updates: conf.skip_pg_catalog_updates, + reconfigure_concurrency: conf.reconfigure_concurrency, drop_subscriptions_before_start: conf.drop_subscriptions_before_start, features: conf.features, + cluster: conf.cluster, }) } @@ -607,7 +621,7 @@ impl Endpoint { }; // Create spec file - let spec = ComputeSpec { + let mut spec = ComputeSpec { skip_pg_catalog_updates: self.skip_pg_catalog_updates, format_version: 1.0, operation_uuid: None, @@ -640,7 +654,7 @@ impl Endpoint { Vec::new() }, settings: None, - postgresql_conf: Some(postgresql_conf), + postgresql_conf: Some(postgresql_conf.clone()), }, delta_operations: None, tenant_id: Some(self.tenant_id), @@ -653,9 +667,35 @@ impl Endpoint { pgbouncer_settings: None, shard_stripe_size: Some(shard_stripe_size), local_proxy_config: None, - reconfigure_concurrency: 1, + reconfigure_concurrency: self.reconfigure_concurrency, drop_subscriptions_before_start: self.drop_subscriptions_before_start, }; + + // this strange code is needed to support respec() in tests + if self.cluster.is_some() { + debug!("Cluster is already set in the endpoint spec, using it"); + spec.cluster = self.cluster.clone().unwrap(); + + debug!("spec.cluster {:?}", spec.cluster); + + // fill missing fields again + if create_test_user { + spec.cluster.roles.push(Role { + name: PgIdent::from_str("test").unwrap(), + encrypted_password: None, + options: None, + }); + spec.cluster.databases.push(Database { + name: PgIdent::from_str("neondb").unwrap(), + owner: PgIdent::from_str("test").unwrap(), + options: None, + restrict_conn: false, + invalid: false, + }); + } + spec.cluster.postgresql_conf = Some(postgresql_conf); + } + let spec_path = self.endpoint_path().join("spec.json"); std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 767a34bcbc..8fffae92fb 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -252,7 +252,7 @@ pub enum ComputeMode { Replica, } -#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] pub struct Cluster { pub cluster_id: Option, pub name: Option, @@ -283,7 +283,7 @@ pub struct DeltaOp { /// Rust representation of Postgres role info with only those fields /// that matter for us. -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct Role { pub name: PgIdent, pub encrypted_password: Option, @@ -292,7 +292,7 @@ pub struct Role { /// Rust representation of Postgres database info with only those fields /// that matter for us. -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct Database { pub name: PgIdent, pub owner: PgIdent, @@ -308,7 +308,7 @@ pub struct Database { /// Common type representing both SQL statement params with or without value, /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config /// options like `wal_level = logical`. -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct GenericOption { pub name: String, pub value: Option, diff --git a/test_runner/regress/test_subscriber_branching.py b/test_runner/regress/test_subscriber_branching.py index 849d4f024d..6175643389 100644 --- a/test_runner/regress/test_subscriber_branching.py +++ b/test_runner/regress/test_subscriber_branching.py @@ -1,9 +1,10 @@ from __future__ import annotations +import threading import time from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv +from fixtures.neon_fixtures import NeonEnv, logical_replication_sync from fixtures.utils import query_scalar, wait_until @@ -239,3 +240,173 @@ def test_subscriber_branching(neon_simple_env: NeonEnv): res = scur_postgres.fetchall() assert len(res) == 1 assert str(sub_child_2_timeline_id) == res[0][0] + + +def test_multiple_subscription_branching(neon_simple_env: NeonEnv): + """ + Test that compute_ctl can handle concurrent deletion of subscriptions in a multiple databases + """ + env = neon_simple_env + + NUMBER_OF_DBS = 5 + + # Create and start endpoint so that neon_local put all the generated + # stuff into the spec.json file. + endpoint = env.endpoints.create_start( + "main", + config_lines=[ + "max_replication_slots = 10", + "max_logical_replication_workers=10", + "max_worker_processes=10", + ], + ) + + TEST_DB_NAMES = [ + { + "name": "neondb", + "owner": "cloud_admin", + }, + { + "name": "publisher_db", + "owner": "cloud_admin", + }, + ] + + for i in range(NUMBER_OF_DBS): + TEST_DB_NAMES.append( + { + "name": f"db{i}", + "owner": "cloud_admin", + } + ) + + # Update the spec.json file to create the databases + # and reconfigure the endpoint to apply the changes. + endpoint.respec_deep( + **{ + "skip_pg_catalog_updates": False, + "cluster": { + "databases": TEST_DB_NAMES, + }, + } + ) + endpoint.reconfigure() + + connstr = endpoint.connstr(dbname="publisher_db").replace("'", "''") + + # create table, replication and subscription for each of the databases + with endpoint.cursor(dbname="publisher_db") as publisher_cursor: + for i in range(NUMBER_OF_DBS): + publisher_cursor.execute(f"CREATE TABLE t{i}(a int)") + publisher_cursor.execute(f"CREATE PUBLICATION mypub{i} FOR TABLE t{i}") + publisher_cursor.execute( + f"select pg_catalog.pg_create_logical_replication_slot('mysub{i}', 'pgoutput');" + ) + publisher_cursor.execute(f"INSERT INTO t{i} VALUES ({i})") + + with endpoint.cursor(dbname=f"db{i}") as cursor: + cursor.execute(f"CREATE TABLE t{i}(a int)") + cursor.execute( + f"CREATE SUBSCRIPTION mysub{i} CONNECTION '{connstr}' PUBLICATION mypub{i} WITH (create_slot = false) " + ) + + # wait for the subscription to be active + for i in range(NUMBER_OF_DBS): + logical_replication_sync( + endpoint, + endpoint, + f"mysub{i}", + sub_dbname=f"db{i}", + pub_dbname="publisher_db", + ) + + # Check that replication is working + for i in range(NUMBER_OF_DBS): + with endpoint.cursor(dbname=f"db{i}") as cursor: + cursor.execute(f"SELECT * FROM t{i}") + rows = cursor.fetchall() + assert len(rows) == 1 + assert rows[0][0] == i + + last_insert_lsn = query_scalar(cursor, "select pg_current_wal_insert_lsn();") + + def start_publisher_workload(table_num: int, duration: int): + start = time.time() + with endpoint.cursor(dbname="publisher_db") as cur: + while time.time() - start < duration: + cur.execute(f"INSERT INTO t{i} SELECT FROM generate_series(1,1000)") + + LOAD_DURATION = 5 + threads = [ + threading.Thread(target=start_publisher_workload, args=(i, LOAD_DURATION)) + for i in range(NUMBER_OF_DBS) + ] + + for thread in threads: + thread.start() + + sub_child_1_timeline_id = env.create_branch( + "subscriber_child_1", + ancestor_branch_name="main", + ancestor_start_lsn=last_insert_lsn, + ) + + sub_child_1 = env.endpoints.create("subscriber_child_1") + + sub_child_1.respec( + skip_pg_catalog_updates=False, + reconfigure_concurrency=5, + drop_subscriptions_before_start=True, + cluster={ + "databases": TEST_DB_NAMES, + "roles": [], + }, + ) + + sub_child_1.start() + + # ensure that subscription deletion happened on this timeline + with sub_child_1.cursor() as scur_postgres: + scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done") + res = scur_postgres.fetchall() + log.info(f"res = {res}") + assert len(res) == 1 + assert str(sub_child_1_timeline_id) == res[0][0] + + # ensure that there are no subscriptions in the databases + for i in range(NUMBER_OF_DBS): + with sub_child_1.cursor(dbname=f"db{i}") as cursor: + cursor.execute("SELECT * FROM pg_catalog.pg_subscription") + res = cursor.fetchall() + assert len(res) == 0 + + # ensure that there are no unexpected rows in the tables + cursor.execute(f"SELECT * FROM t{i}") + rows = cursor.fetchall() + assert len(rows) == 1 + assert rows[0][0] == i + + for thread in threads: + thread.join() + + # ensure that logical replication is still working in main endpoint + # wait for it to catch up + for i in range(NUMBER_OF_DBS): + logical_replication_sync( + endpoint, + endpoint, + f"mysub{i}", + sub_dbname=f"db{i}", + pub_dbname="publisher_db", + ) + + # verify that the data is the same in publisher and subscriber tables + with endpoint.cursor(dbname="publisher_db") as publisher_cursor: + for i in range(NUMBER_OF_DBS): + with endpoint.cursor(dbname=f"db{i}") as cursor: + publisher_cursor.execute(f"SELECT count(*) FROM t{i}") + cursor.execute(f"SELECT count(*) FROM t{i}") + pub_res = publisher_cursor.fetchone() + sub_res = cursor.fetchone() + log.info(f"for table t{i}: pub_res = {pub_res}, sub_res = {sub_res}") + assert pub_res == sub_res From e808e9432af8ec6809cf97de577ff4e2a466fd02 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Thu, 20 Feb 2025 21:16:04 +0400 Subject: [PATCH 02/11] storcon: use https for pageservers (#10759) ## Problem Storage controller uses unsecure http for pageserver API. Closes: https://github.com/neondatabase/cloud/issues/23734 Closes: https://github.com/neondatabase/cloud/issues/24091 ## Summary of changes - Add an optional `listen_https_port` field to storage controller's Node state and its API (RegisterNode/ListNodes/etc). - Allow updating `listen_https_port` on node registration to gradually add https port for all nodes. - Add `use_https_pageserver_api` CLI option to storage controller to enable https. - Pageserver doesn't support https for now and always reports `https_port=None`. This will be addressed in follow-up PR. --- control_plane/storcon_cli/src/main.rs | 5 ++ libs/pageserver_api/src/controller_api.rs | 3 + pageserver/src/controller_upcall_client.rs | 1 + .../down.sql | 1 + .../up.sql | 1 + storage_controller/src/main.rs | 5 ++ storage_controller/src/node.rs | 60 +++++++++++-- storage_controller/src/persistence.rs | 40 ++++++++- storage_controller/src/scheduler.rs | 5 +- storage_controller/src/schema.rs | 1 + storage_controller/src/service.rs | 85 +++++++++++++++---- test_runner/fixtures/neon_fixtures.py | 2 + .../regress/test_storage_controller.py | 53 ++++++++++++ 13 files changed, 231 insertions(+), 31 deletions(-) create mode 100644 storage_controller/migrations/2025-02-11-144848_pageserver_use_https/down.sql create mode 100644 storage_controller/migrations/2025-02-11-144848_pageserver_use_https/up.sql diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 3c574efc63..953ade83ad 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -47,6 +47,9 @@ enum Command { listen_http_addr: String, #[arg(long)] listen_http_port: u16, + #[arg(long)] + listen_https_port: Option, + #[arg(long)] availability_zone_id: String, }, @@ -394,6 +397,7 @@ async fn main() -> anyhow::Result<()> { listen_pg_port, listen_http_addr, listen_http_port, + listen_https_port, availability_zone_id, } => { storcon_client @@ -406,6 +410,7 @@ async fn main() -> anyhow::Result<()> { listen_pg_port, listen_http_addr, listen_http_port, + listen_https_port, availability_zone_id: AvailabilityZone(availability_zone_id), }), ) diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 42f6e47e63..f94bfab581 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -57,6 +57,7 @@ pub struct NodeRegisterRequest { pub listen_http_addr: String, pub listen_http_port: u16, + pub listen_https_port: Option, pub availability_zone_id: AvailabilityZone, } @@ -105,6 +106,7 @@ pub struct TenantLocateResponseShard { pub listen_http_addr: String, pub listen_http_port: u16, + pub listen_https_port: Option, } #[derive(Serialize, Deserialize)] @@ -148,6 +150,7 @@ pub struct NodeDescribeResponse { pub listen_http_addr: String, pub listen_http_port: u16, + pub listen_https_port: Option, pub listen_pg_addr: String, pub listen_pg_port: u16, diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index d41bfd9021..4990f17b40 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -173,6 +173,7 @@ impl ControlPlaneGenerationsApi for ControllerUpcallClient { listen_pg_port: m.postgres_port, listen_http_addr: m.http_host, listen_http_port: m.http_port, + listen_https_port: None, // TODO: Support https. availability_zone_id: az_id.expect("Checked above"), }) } diff --git a/storage_controller/migrations/2025-02-11-144848_pageserver_use_https/down.sql b/storage_controller/migrations/2025-02-11-144848_pageserver_use_https/down.sql new file mode 100644 index 0000000000..0f051d3ac3 --- /dev/null +++ b/storage_controller/migrations/2025-02-11-144848_pageserver_use_https/down.sql @@ -0,0 +1 @@ +ALTER TABLE nodes DROP listen_https_port; diff --git a/storage_controller/migrations/2025-02-11-144848_pageserver_use_https/up.sql b/storage_controller/migrations/2025-02-11-144848_pageserver_use_https/up.sql new file mode 100644 index 0000000000..172237d477 --- /dev/null +++ b/storage_controller/migrations/2025-02-11-144848_pageserver_use_https/up.sql @@ -0,0 +1 @@ +ALTER TABLE nodes ADD listen_https_port INTEGER; diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 9a9958f7a6..be074d269d 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -126,6 +126,10 @@ struct Cli { #[arg(long)] long_reconcile_threshold: Option, + + // Flag to use https for requests to pageserver API. + #[arg(long, default_value = "false")] + use_https_pageserver_api: bool, } enum StrictMode { @@ -321,6 +325,7 @@ async fn async_main() -> anyhow::Result<()> { address_for_peers: args.address_for_peers, start_as_candidate: args.start_as_candidate, http_service_port: args.listen.port() as i32, + use_https_pageserver_api: args.use_https_pageserver_api, }; // Validate that we can connect to the database diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index f5c2d329e0..3762d13c10 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -1,5 +1,6 @@ use std::{str::FromStr, time::Duration}; +use anyhow::anyhow; use pageserver_api::{ controller_api::{ AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, @@ -32,12 +33,16 @@ pub(crate) struct Node { listen_http_addr: String, listen_http_port: u16, + listen_https_port: Option, listen_pg_addr: String, listen_pg_port: u16, availability_zone_id: AvailabilityZone, + // Flag from storcon's config to use https for pageserver admin API. + // Invariant: if |true|, listen_https_port should contain a value. + use_https: bool, // This cancellation token means "stop any RPCs in flight to this node, and don't start // any more". It is not related to process shutdown. #[serde(skip)] @@ -56,7 +61,16 @@ pub(crate) enum AvailabilityTransition { impl Node { pub(crate) fn base_url(&self) -> String { - format!("http://{}:{}", self.listen_http_addr, self.listen_http_port) + if self.use_https { + format!( + "https://{}:{}", + self.listen_http_addr, + self.listen_https_port + .expect("https port should be specified if use_https is on") + ) + } else { + format!("http://{}:{}", self.listen_http_addr, self.listen_http_port) + } } pub(crate) fn get_id(&self) -> NodeId { @@ -82,11 +96,20 @@ impl Node { self.id == register_req.node_id && self.listen_http_addr == register_req.listen_http_addr && self.listen_http_port == register_req.listen_http_port + // Note: listen_https_port may change. See [`Self::need_update`] for mode details. + // && self.listen_https_port == register_req.listen_https_port && self.listen_pg_addr == register_req.listen_pg_addr && self.listen_pg_port == register_req.listen_pg_port && self.availability_zone_id == register_req.availability_zone_id } + // Do we need to update an existing record in DB on this registration request? + pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool { + // listen_https_port is checked here because it may change during migration to https. + // After migration, this check may be moved to registration_match. + self.listen_https_port != register_req.listen_https_port + } + /// For a shard located on this node, populate a response object /// with this node's address information. pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard { @@ -95,6 +118,7 @@ impl Node { node_id: self.id, listen_http_addr: self.listen_http_addr.clone(), listen_http_port: self.listen_http_port, + listen_https_port: self.listen_https_port, listen_pg_addr: self.listen_pg_addr.clone(), listen_pg_port: self.listen_pg_port, } @@ -175,25 +199,34 @@ impl Node { } } + #[allow(clippy::too_many_arguments)] pub(crate) fn new( id: NodeId, listen_http_addr: String, listen_http_port: u16, + listen_https_port: Option, listen_pg_addr: String, listen_pg_port: u16, availability_zone_id: AvailabilityZone, - ) -> Self { - Self { + use_https: bool, + ) -> anyhow::Result { + if use_https && listen_https_port.is_none() { + return Err(anyhow!("https is enabled, but node has no https port")); + } + + Ok(Self { id, listen_http_addr, listen_http_port, + listen_https_port, listen_pg_addr, listen_pg_port, scheduling: NodeSchedulingPolicy::Active, availability: NodeAvailability::Offline, availability_zone_id, + use_https, cancel: CancellationToken::new(), - } + }) } pub(crate) fn to_persistent(&self) -> NodePersistence { @@ -202,14 +235,19 @@ impl Node { scheduling_policy: self.scheduling.into(), listen_http_addr: self.listen_http_addr.clone(), listen_http_port: self.listen_http_port as i32, + listen_https_port: self.listen_https_port.map(|x| x as i32), listen_pg_addr: self.listen_pg_addr.clone(), listen_pg_port: self.listen_pg_port as i32, availability_zone_id: self.availability_zone_id.0.clone(), } } - pub(crate) fn from_persistent(np: NodePersistence) -> Self { - Self { + pub(crate) fn from_persistent(np: NodePersistence, use_https: bool) -> anyhow::Result { + if use_https && np.listen_https_port.is_none() { + return Err(anyhow!("https is enabled, but node has no https port")); + } + + Ok(Self { id: NodeId(np.node_id as u64), // At startup we consider a node offline until proven otherwise. availability: NodeAvailability::Offline, @@ -217,11 +255,13 @@ impl Node { .expect("Bad scheduling policy in DB"), listen_http_addr: np.listen_http_addr, listen_http_port: np.listen_http_port as u16, + listen_https_port: np.listen_https_port.map(|x| x as u16), listen_pg_addr: np.listen_pg_addr, listen_pg_port: np.listen_pg_port as u16, availability_zone_id: AvailabilityZone(np.availability_zone_id), + use_https, cancel: CancellationToken::new(), - } + }) } /// Wrapper for issuing requests to pageserver management API: takes care of generic @@ -285,8 +325,9 @@ impl Node { warn_threshold, max_retries, &format!( - "Call to node {} ({}:{}) management API", - self.id, self.listen_http_addr, self.listen_http_port + "Call to node {} ({}) management API", + self.id, + self.base_url(), ), cancel, ) @@ -302,6 +343,7 @@ impl Node { availability_zone_id: self.availability_zone_id.0.clone(), listen_http_addr: self.listen_http_addr.clone(), listen_http_port: self.listen_http_port, + listen_https_port: self.listen_https_port, listen_pg_addr: self.listen_pg_addr.clone(), listen_pg_port: self.listen_pg_port, } diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 67b60eadf3..459c11add9 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -375,18 +375,23 @@ impl Persistence { Ok(nodes) } - pub(crate) async fn update_node( + pub(crate) async fn update_node( &self, input_node_id: NodeId, - input_scheduling: NodeSchedulingPolicy, - ) -> DatabaseResult<()> { + values: V, + ) -> DatabaseResult<()> + where + V: diesel::AsChangeset + Clone + Send + Sync, + V::Changeset: diesel::query_builder::QueryFragment + Send, // valid Postgres SQL + { use crate::schema::nodes::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::UpdateNode, move |conn| { + let values = values.clone(); Box::pin(async move { let updated = diesel::update(nodes) .filter(node_id.eq(input_node_id.0 as i64)) - .set((scheduling_policy.eq(String::from(input_scheduling)),)) + .set(values) .execute(conn) .await?; Ok(updated) @@ -403,6 +408,32 @@ impl Persistence { } } + pub(crate) async fn update_node_scheduling_policy( + &self, + input_node_id: NodeId, + input_scheduling: NodeSchedulingPolicy, + ) -> DatabaseResult<()> { + use crate::schema::nodes::dsl::*; + self.update_node( + input_node_id, + scheduling_policy.eq(String::from(input_scheduling)), + ) + .await + } + + pub(crate) async fn update_node_on_registration( + &self, + input_node_id: NodeId, + input_https_port: Option, + ) -> DatabaseResult<()> { + use crate::schema::nodes::dsl::*; + self.update_node( + input_node_id, + listen_https_port.eq(input_https_port.map(|x| x as i32)), + ) + .await + } + /// At startup, load the high level state for shards, such as their config + policy. This will /// be enriched at runtime with state discovered on pageservers. /// @@ -1452,6 +1483,7 @@ pub(crate) struct NodePersistence { pub(crate) listen_pg_addr: String, pub(crate) listen_pg_port: i32, pub(crate) availability_zone_id: String, + pub(crate) listen_https_port: Option, } /// Tenant metadata health status that are stored durably. diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 106a7b2699..44936d018a 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -930,13 +930,16 @@ pub(crate) mod test_utils { NodeId(i), format!("httphost-{i}"), 80 + i as u16, + None, format!("pghost-{i}"), 5432 + i as u16, az_iter .next() .cloned() .unwrap_or(AvailabilityZone("test-az".to_string())), - ); + false, + ) + .unwrap(); node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0))); assert!(node.is_available()); node diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index 14c30c296d..361253bd19 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -26,6 +26,7 @@ diesel::table! { listen_pg_addr -> Varchar, listen_pg_port -> Int4, availability_zone_id -> Varchar, + listen_https_port -> Nullable, } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index fc6d2f3d29..25a1cb4252 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -399,6 +399,8 @@ pub struct Config { pub http_service_port: i32, pub long_reconcile_threshold: Duration, + + pub use_https_pageserver_api: bool, } impl From for ApiError { @@ -1401,8 +1403,8 @@ impl Service { .list_nodes() .await? .into_iter() - .map(Node::from_persistent) - .collect::>(); + .map(|x| Node::from_persistent(x, config.use_https_pageserver_api)) + .collect::>>()?; let nodes: HashMap = nodes.into_iter().map(|n| (n.get_id(), n)).collect(); tracing::info!("Loaded {} nodes from database.", nodes.len()); metrics::METRICS_REGISTRY @@ -1501,10 +1503,13 @@ impl Service { NodeId(node_id as u64), "".to_string(), 123, + None, "".to_string(), 123, AvailabilityZone("test_az".to_string()), - ); + false, + ) + .unwrap(); scheduler.node_upsert(&node); } @@ -5907,8 +5912,10 @@ impl Service { ) .await; + #[derive(PartialEq)] enum RegistrationStatus { - Matched, + UpToDate, + NeedUpdate, Mismatched, New, } @@ -5917,7 +5924,11 @@ impl Service { let locked = self.inner.read().unwrap(); if let Some(node) = locked.nodes.get(®ister_req.node_id) { if node.registration_match(®ister_req) { - RegistrationStatus::Matched + if node.need_update(®ister_req) { + RegistrationStatus::NeedUpdate + } else { + RegistrationStatus::UpToDate + } } else { RegistrationStatus::Mismatched } @@ -5927,9 +5938,9 @@ impl Service { }; match registration_status { - RegistrationStatus::Matched => { + RegistrationStatus::UpToDate => { tracing::info!( - "Node {} re-registered with matching address", + "Node {} re-registered with matching address and is up to date", register_req.node_id ); @@ -5947,7 +5958,7 @@ impl Service { "Node is already registered with different address".to_string(), )); } - RegistrationStatus::New => { + RegistrationStatus::New | RegistrationStatus::NeedUpdate => { // fallthrough } } @@ -5976,6 +5987,16 @@ impl Service { )); } + if self.config.use_https_pageserver_api && register_req.listen_https_port.is_none() { + return Err(ApiError::PreconditionFailed( + format!( + "Node {} has no https port, but use_https is enabled", + register_req.node_id + ) + .into(), + )); + } + // Ordering: we must persist the new node _before_ adding it to in-memory state. // This ensures that before we use it for anything or expose it via any external // API, it is guaranteed to be available after a restart. @@ -5983,13 +6004,29 @@ impl Service { register_req.node_id, register_req.listen_http_addr, register_req.listen_http_port, + register_req.listen_https_port, register_req.listen_pg_addr, register_req.listen_pg_port, register_req.availability_zone_id.clone(), + self.config.use_https_pageserver_api, ); + let new_node = match new_node { + Ok(new_node) => new_node, + Err(error) => return Err(ApiError::InternalServerError(error)), + }; - // TODO: idempotency if the node already exists in the database - self.persistence.insert_node(&new_node).await?; + match registration_status { + RegistrationStatus::New => self.persistence.insert_node(&new_node).await?, + RegistrationStatus::NeedUpdate => { + self.persistence + .update_node_on_registration( + register_req.node_id, + register_req.listen_https_port, + ) + .await? + } + _ => unreachable!("Other statuses have been processed earlier"), + } let mut locked = self.inner.write().unwrap(); let mut new_nodes = (*locked.nodes).clone(); @@ -6004,12 +6041,24 @@ impl Service { .storage_controller_pageserver_nodes .set(locked.nodes.len() as i64); - tracing::info!( - "Registered pageserver {} ({}), now have {} pageservers", - register_req.node_id, - register_req.availability_zone_id, - locked.nodes.len() - ); + match registration_status { + RegistrationStatus::New => { + tracing::info!( + "Registered pageserver {} ({}), now have {} pageservers", + register_req.node_id, + register_req.availability_zone_id, + locked.nodes.len() + ); + } + RegistrationStatus::NeedUpdate => { + tracing::info!( + "Re-registered and updated node {} ({})", + register_req.node_id, + register_req.availability_zone_id, + ); + } + _ => unreachable!("Other statuses have been processed earlier"), + } Ok(()) } @@ -6027,7 +6076,9 @@ impl Service { if let Some(scheduling) = scheduling { // Scheduling is a persistent part of Node: we must write updates to the database before // applying them in memory - self.persistence.update_node(node_id, scheduling).await?; + self.persistence + .update_node_scheduling_policy(node_id, scheduling) + .await?; } // If we're activating a node, then before setting it active we must reconcile any shard locations diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 58c5dbfd29..36af522535 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1630,6 +1630,7 @@ def neon_env_builder( class PageserverPort: pg: int http: int + https: int | None = None class LogUtils: @@ -1886,6 +1887,7 @@ class NeonStorageController(MetricsGetter, LogUtils): "node_id": int(node.id), "listen_http_addr": "localhost", "listen_http_port": node.service_port.http, + "listen_https_port": node.service_port.https, "listen_pg_addr": "localhost", "listen_pg_port": node.service_port.pg, "availability_zone_id": node.az_id, diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 1d95312140..7e895422d2 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -3764,3 +3764,56 @@ def test_storage_controller_node_flap_detach_race( assert len(locs) == 1, f"{shard} has {len(locs)} attached locations" wait_until(validate_locations, timeout=10) + + +def test_update_node_on_registration(neon_env_builder: NeonEnvBuilder): + """ + Check that storage controller handles node_register requests with updated fields correctly. + 1. Run storage controller and register 1 pageserver without https port. + 2. Register the same pageserver with https port. Check that port has been updated. + 3. Restart the storage controller. Check that https port is persistent. + 4. Register the same pageserver without https port again (rollback). Check that port has been removed. + """ + neon_env_builder.num_pageservers = 1 + env = neon_env_builder.init_configs() + + env.storage_controller.start() + env.storage_controller.wait_until_ready() + + pageserver = env.pageservers[0] + + # Step 1. Register pageserver without https port. + env.storage_controller.node_register(pageserver) + env.storage_controller.consistency_check() + + nodes = env.storage_controller.node_list() + assert len(nodes) == 1 + assert nodes[0]["listen_https_port"] is None + + # Step 2. Register pageserver with https port. + pageserver.service_port.https = 1234 + env.storage_controller.node_register(pageserver) + env.storage_controller.consistency_check() + + nodes = env.storage_controller.node_list() + assert len(nodes) == 1 + assert nodes[0]["listen_https_port"] == 1234 + + # Step 3. Restart storage controller. + env.storage_controller.stop() + env.storage_controller.start() + env.storage_controller.wait_until_ready() + env.storage_controller.consistency_check() + + nodes = env.storage_controller.node_list() + assert len(nodes) == 1 + assert nodes[0]["listen_https_port"] == 1234 + + # Step 4. Register pageserver with no https port again. + pageserver.service_port.https = None + env.storage_controller.node_register(pageserver) + env.storage_controller.consistency_check() + + nodes = env.storage_controller.node_list() + assert len(nodes) == 1 + assert nodes[0]["listen_https_port"] is None From f7474d3f4142d1a05dda0719b19037358f717bae Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Thu, 20 Feb 2025 11:31:42 -0600 Subject: [PATCH 03/11] Remove forward compatibility hacks related to compute HTTP servers (#10797) These hacks were added to appease the forward compatibility tests and can be removed. Signed-off-by: Tristan Partin --- compute_tools/src/bin/compute_ctl.rs | 15 ++++++--------- control_plane/src/endpoint.rs | 14 +++++--------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index a8803ec793..9193f06b3b 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -112,16 +112,13 @@ struct Cli { /// outside the compute will talk to the compute through this port. Keep /// the previous name for this argument around for a smoother release /// with the control plane. - /// - /// TODO: Remove the alias after the control plane release which teaches the - /// control plane about the renamed argument. - #[arg(long, alias = "http-port", default_value_t = 3080)] + #[arg(long, default_value_t = 3080)] pub external_http_port: u16, - /// The port to bind the internal listening HTTP server to. Clients like + /// The port to bind the internal listening HTTP server to. Clients include /// the neon extension (for installing remote extensions) and local_proxy. - #[arg(long)] - pub internal_http_port: Option, + #[arg(long, default_value_t = 3081)] + pub internal_http_port: u16, #[arg(short = 'D', long, value_name = "DATADIR")] pub pgdata: String, @@ -359,7 +356,7 @@ fn wait_spec( pgbin: cli.pgbin.clone(), pgversion: get_pg_version_string(&cli.pgbin), external_http_port: cli.external_http_port, - internal_http_port: cli.internal_http_port.unwrap_or(cli.external_http_port + 1), + internal_http_port: cli.internal_http_port, live_config_allowed, state: Mutex::new(new_state), state_changed: Condvar::new(), @@ -383,7 +380,7 @@ fn wait_spec( // The internal HTTP server could be launched later, but there isn't much // sense in waiting. - Server::Internal(cli.internal_http_port.unwrap_or(cli.external_http_port + 1)).launch(&compute); + Server::Internal(cli.internal_http_port).launch(&compute); if !spec_set { // No spec provided, hang waiting for it. diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index c16b3cb017..c22ff20c70 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -713,18 +713,14 @@ impl Endpoint { println!("Also at '{}'", conn_str); } let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl")); - //cmd.args([ - // "--external-http-port", - // &self.external_http_address.port().to_string(), - //]) - //.args([ - // "--internal-http-port", - // &self.internal_http_address.port().to_string(), - //]) cmd.args([ - "--http-port", + "--external-http-port", &self.external_http_address.port().to_string(), ]) + .args([ + "--internal-http-port", + &self.internal_http_address.port().to_string(), + ]) .args(["--pgdata", self.pgdata().to_str().unwrap()]) .args(["--connstr", &conn_str]) .args([ From d571553d8aff2e9f16c8dbc1ad59370e27418eeb Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Thu, 20 Feb 2025 11:31:52 -0600 Subject: [PATCH 04/11] Remove hacks in compute_ctl related to compute ID (#10751) --- compute_tools/src/bin/compute_ctl.rs | 16 +----------- control_plane/src/endpoint.rs | 26 +++++++++---------- .../compute_wrapper/shell/compute.sh | 1 + 3 files changed, 14 insertions(+), 29 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 9193f06b3b..1cdae718fe 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -41,7 +41,6 @@ use std::process::exit; use std::str::FromStr; use std::sync::atomic::Ordering; use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock}; -use std::time::SystemTime; use std::{thread, time::Duration}; use anyhow::{Context, Result}; @@ -86,19 +85,6 @@ fn parse_remote_ext_config(arg: &str) -> Result { } } -/// Generate a compute ID if one is not supplied. This exists to keep forward -/// compatibility tests working, but will be removed in a future iteration. -fn generate_compute_id() -> String { - let now = SystemTime::now(); - - format!( - "compute-{}", - now.duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() - ) -} - #[derive(Parser)] #[command(rename_all = "kebab-case")] struct Cli { @@ -153,7 +139,7 @@ struct Cli { #[arg(short = 'S', long, group = "spec-path")] pub spec_path: Option, - #[arg(short = 'i', long, group = "compute-id", default_value = generate_compute_id())] + #[arg(short = 'i', long, group = "compute-id")] pub compute_id: String, #[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")] diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index c22ff20c70..407578abb8 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -46,6 +46,8 @@ use std::process::Command; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; use anyhow::{anyhow, bail, Context, Result}; use compute_api::requests::ConfigurationRequest; @@ -737,20 +739,16 @@ impl Endpoint { ]) // TODO: It would be nice if we generated compute IDs with the same // algorithm as the real control plane. - // - // TODO: Add this back when - // https://github.com/neondatabase/neon/pull/10747 is merged. - // - //.args([ - // "--compute-id", - // &format!( - // "compute-{}", - // SystemTime::now() - // .duration_since(UNIX_EPOCH) - // .unwrap() - // .as_secs() - // ), - //]) + .args([ + "--compute-id", + &format!( + "compute-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + ), + ]) .stdin(std::process::Stdio::null()) .stderr(logfile.try_clone()?) .stdout(logfile); diff --git a/docker-compose/compute_wrapper/shell/compute.sh b/docker-compose/compute_wrapper/shell/compute.sh index b4f8d3d66a..9dbdcce69f 100755 --- a/docker-compose/compute_wrapper/shell/compute.sh +++ b/docker-compose/compute_wrapper/shell/compute.sh @@ -77,4 +77,5 @@ echo "Start compute node" /usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \ -C "postgresql://cloud_admin@localhost:55433/postgres" \ -b /usr/local/bin/postgres \ + --compute-id "compute-$RANDOM" \ -S ${SPEC_FILE} From 34996416d65eed2377f3cbf8bd4559792c26a045 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 20 Feb 2025 17:49:05 +0000 Subject: [PATCH 05/11] pageserver: guard against WAL gaps in the interpreted protocol (#10858) ## Problem The interpreted SK <-> PS protocol does not guard against gaps (neither does the Vanilla one, but that's beside the point). ## Summary of changes Extend the protocol to include the start LSN of the PG WAL section from which the records were interpreted. Validation is enabled via a config flag on the pageserver and works as follows: **Case 1**: `raw_wal_start_lsn` is smaller than the requested LSN There can't be gaps here, but we check that the shard received records which it hasn't seen before. **Case 2**: `raw_wal_start_lsn` is equal to the requested LSN This is the happy case. No gap and nothing to check **Case 3**: `raw_wal_start_lsn` is greater than the requested LSN This is a gap. To make Case 3 work I had to bend the protocol a bit. We read record chunks of WAL which aren't record aligned and feed them to the decoder. The picture below shows a shard which subscribes at a position somewhere within Record 2. We already have a wal reader which is below that position so we wait to catch up. We read some wal in Read 1 (all of Record 1 and some of Record 2). The new shard doesn't need Record 1 (it has already processed it according to the starting position), but we read past it's starting position. When we do Read 2, we decode Record 2 and ship it off to the shard, but the starting position of Read 2 is greater than the starting position the shard requested. This looks like a gap. ![image](https://github.com/user-attachments/assets/8aed292e-5d62-46a3-9b01-fbf9dc25efe0) To make it work, we extend the protocol to send an empty `InterpretedWalRecords` to shards if the WAL the records originated from ends the requested start position. On the pageserver, that just updates the tracking LSNs in memory (no-op really). This gives us a workaround for the fake gap. As a drive by, make `InterpretedWalRecords::next_record_lsn` mandatory in the application level definition. It's always included. Related: https://github.com/neondatabase/cloud/issues/23935 --- libs/pageserver_api/src/config.rs | 3 + libs/wal_decoder/proto/interpreted_wal.proto | 1 + libs/wal_decoder/src/models.rs | 6 +- libs/wal_decoder/src/wire_format.rs | 9 ++- pageserver/src/bin/pageserver.rs | 1 + pageserver/src/config.rs | 6 ++ pageserver/src/tenant/timeline.rs | 1 + pageserver/src/tenant/timeline/walreceiver.rs | 1 + .../walreceiver/connection_manager.rs | 3 + .../walreceiver/walreceiver_connection.rs | 56 +++++++++++++++---- safekeeper/src/send_interpreted_wal.rs | 51 +++++++++++++---- test_runner/fixtures/neon_fixtures.py | 12 ++-- 12 files changed, 120 insertions(+), 30 deletions(-) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 0f33bcf45b..1aff5a7012 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -122,6 +122,8 @@ pub struct ConfigToml { pub page_service_pipelining: PageServicePipeliningConfig, pub get_vectored_concurrent_io: GetVectoredConcurrentIo, pub enable_read_path_debugging: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub validate_wal_contiguity: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -521,6 +523,7 @@ impl Default for ConfigToml { } else { None }, + validate_wal_contiguity: None, } } } diff --git a/libs/wal_decoder/proto/interpreted_wal.proto b/libs/wal_decoder/proto/interpreted_wal.proto index d68484d30f..7b40201a75 100644 --- a/libs/wal_decoder/proto/interpreted_wal.proto +++ b/libs/wal_decoder/proto/interpreted_wal.proto @@ -5,6 +5,7 @@ package interpreted_wal; message InterpretedWalRecords { repeated InterpretedWalRecord records = 1; optional uint64 next_record_lsn = 2; + optional uint64 raw_wal_start_lsn = 3; } message InterpretedWalRecord { diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 51bf7e44ab..7e1934c6c3 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -60,7 +60,11 @@ pub struct InterpretedWalRecords { pub records: Vec, // Start LSN of the next record after the batch. // Note that said record may not belong to the current shard. - pub next_record_lsn: Option, + pub next_record_lsn: Lsn, + // Inclusive start LSN of the PG WAL from which the interpreted + // WAL records were extracted. Note that this is not necessarily the + // start LSN of the first interpreted record in the batch. + pub raw_wal_start_lsn: Option, } /// An interpreted Postgres WAL record, ready to be handled by the pageserver diff --git a/libs/wal_decoder/src/wire_format.rs b/libs/wal_decoder/src/wire_format.rs index 944ee5c919..52ed5c70b5 100644 --- a/libs/wal_decoder/src/wire_format.rs +++ b/libs/wal_decoder/src/wire_format.rs @@ -167,7 +167,8 @@ impl TryFrom for proto::InterpretedWalRecords { .collect::, _>>()?; Ok(proto::InterpretedWalRecords { records, - next_record_lsn: value.next_record_lsn.map(|l| l.0), + next_record_lsn: Some(value.next_record_lsn.0), + raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0), }) } } @@ -254,7 +255,11 @@ impl TryFrom for InterpretedWalRecords { Ok(InterpretedWalRecords { records, - next_record_lsn: value.next_record_lsn.map(Lsn::from), + next_record_lsn: value + .next_record_lsn + .map(Lsn::from) + .expect("Always provided"), + raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from), }) } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index fa098e9364..e2b9a7f073 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -134,6 +134,7 @@ fn main() -> anyhow::Result<()> { info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine"); info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode"); info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol"); + info!(?conf.validate_wal_contiguity, "starting with WAL contiguity validation"); info!(?conf.page_service_pipelining, "starting with page service pipelining config"); info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config"); diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index c5368f6806..09d9444dd5 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -197,6 +197,10 @@ pub struct PageServerConf { /// Enable read path debugging. If enabled, read key errors will print a backtrace of the layer /// files read. pub enable_read_path_debugging: bool, + + /// Interpreted protocol feature: if enabled, validate that the logical WAL received from + /// safekeepers does not have gaps. + pub validate_wal_contiguity: bool, } /// Token for authentication to safekeepers @@ -360,6 +364,7 @@ impl PageServerConf { page_service_pipelining, get_vectored_concurrent_io, enable_read_path_debugging, + validate_wal_contiguity, } = config_toml; let mut conf = PageServerConf { @@ -446,6 +451,7 @@ impl PageServerConf { virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()), no_sync: no_sync.unwrap_or(false), enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false), + validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false), }; // ------------------------------------------------------------ diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b9425d2777..30de4d90dc 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2874,6 +2874,7 @@ impl Timeline { auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(), availability_zone: self.conf.availability_zone.clone(), ingest_batch_size: self.conf.ingest_batch_size, + validate_wal_contiguity: self.conf.validate_wal_contiguity, }, broker_client, ctx, diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index f831f5e48a..67429bff98 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -56,6 +56,7 @@ pub struct WalReceiverConf { pub auth_token: Option>, pub availability_zone: Option, pub ingest_batch_size: u64, + pub validate_wal_contiguity: bool, } pub struct WalReceiver { diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 65f9d39078..1955345315 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -537,6 +537,7 @@ impl ConnectionManagerState { let connect_timeout = self.conf.wal_connect_timeout; let ingest_batch_size = self.conf.ingest_batch_size; let protocol = self.conf.protocol; + let validate_wal_contiguity = self.conf.validate_wal_contiguity; let timeline = Arc::clone(&self.timeline); let ctx = ctx.detached_child( TaskKind::WalReceiverConnectionHandler, @@ -558,6 +559,7 @@ impl ConnectionManagerState { ctx, node_id, ingest_batch_size, + validate_wal_contiguity, ) .await; @@ -1563,6 +1565,7 @@ mod tests { auth_token: None, availability_zone: None, ingest_batch_size: 1, + validate_wal_contiguity: false, }, wal_connection: None, wal_stream_candidates: HashMap::new(), diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 23db4f88d2..ff05a8f902 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -120,6 +120,7 @@ pub(super) async fn handle_walreceiver_connection( ctx: RequestContext, safekeeper_node: NodeId, ingest_batch_size: u64, + validate_wal_contiguity: bool, ) -> Result<(), WalReceiverError> { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -274,6 +275,7 @@ pub(super) async fn handle_walreceiver_connection( } => Some((format, compression)), }; + let mut expected_wal_start = startpoint; while let Some(replication_message) = { select! { _ = cancellation.cancelled() => { @@ -340,13 +342,49 @@ pub(super) async fn handle_walreceiver_connection( ) })?; + // Guard against WAL gaps. If the start LSN of the PG WAL section + // from which the interpreted records were extracted, doesn't match + // the end of the previous batch (or the starting point for the first batch), + // then kill this WAL receiver connection and start a new one. + if validate_wal_contiguity { + if let Some(raw_wal_start_lsn) = batch.raw_wal_start_lsn { + match raw_wal_start_lsn.cmp(&expected_wal_start) { + std::cmp::Ordering::Greater => { + let msg = format!( + "Gap in streamed WAL: [{}, {})", + expected_wal_start, raw_wal_start_lsn + ); + critical!("{msg}"); + return Err(WalReceiverError::Other(anyhow!(msg))); + } + std::cmp::Ordering::Less => { + // Other shards are reading WAL behind us. + // This is valid, but check that we received records + // that we haven't seen before. + if let Some(first_rec) = batch.records.first() { + if first_rec.next_record_lsn < last_rec_lsn { + let msg = format!( + "Received record with next_record_lsn multiple times ({} < {})", + first_rec.next_record_lsn, expected_wal_start + ); + critical!("{msg}"); + return Err(WalReceiverError::Other(anyhow!(msg))); + } + } + } + std::cmp::Ordering::Equal => {} + } + } + } + let InterpretedWalRecords { records, next_record_lsn, + raw_wal_start_lsn: _, } = batch; tracing::debug!( - "Received WAL up to {} with next_record_lsn={:?}", + "Received WAL up to {} with next_record_lsn={}", streaming_lsn, next_record_lsn ); @@ -423,12 +461,11 @@ pub(super) async fn handle_walreceiver_connection( // need to advance last record LSN on all shards. If we've not ingested the latest // record, then set the LSN of the modification past it. This way all shards // advance their last record LSN at the same time. - let needs_last_record_lsn_advance = match next_record_lsn { - Some(lsn) if lsn > modification.get_lsn() => { - modification.set_lsn(lsn).unwrap(); - true - } - _ => false, + let needs_last_record_lsn_advance = if next_record_lsn > modification.get_lsn() { + modification.set_lsn(next_record_lsn).unwrap(); + true + } else { + false }; if uncommitted_records > 0 || needs_last_record_lsn_advance { @@ -446,9 +483,8 @@ pub(super) async fn handle_walreceiver_connection( timeline.get_last_record_lsn() ); - if let Some(lsn) = next_record_lsn { - last_rec_lsn = lsn; - } + last_rec_lsn = next_record_lsn; + expected_wal_start = streaming_lsn; Some(streaming_lsn) } diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index 5916675c3f..fb06339604 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -295,6 +295,10 @@ impl InterpretedWalReader { let mut wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version); + // Tracks the start of the PG WAL LSN from which the current batch of + // interpreted records originated. + let mut current_batch_wal_start_lsn: Option = None; + loop { tokio::select! { // Main branch for reading WAL and forwarding it @@ -302,7 +306,7 @@ impl InterpretedWalReader { let wal = wal_or_reset.map(|wor| wor.get_wal().expect("reset handled in select branch below")); let WalBytes { wal, - wal_start_lsn: _, + wal_start_lsn, wal_end_lsn, available_wal_end_lsn, } = match wal { @@ -315,6 +319,12 @@ impl InterpretedWalReader { } }; + // We will already have a value if the previous chunks of WAL + // did not decode into anything useful. + if current_batch_wal_start_lsn.is_none() { + current_batch_wal_start_lsn = Some(wal_start_lsn); + } + wal_decoder.feed_bytes(&wal); // Deserialize and interpret WAL records from this batch of WAL. @@ -363,7 +373,9 @@ impl InterpretedWalReader { let max_next_record_lsn = match max_next_record_lsn { Some(lsn) => lsn, - None => { continue; } + None => { + continue; + } }; // Update the current position such that new receivers can decide @@ -377,21 +389,38 @@ impl InterpretedWalReader { } } + let batch_wal_start_lsn = current_batch_wal_start_lsn.take().unwrap(); + // Send interpreted records downstream. Anything that has already been seen // by a shard is filtered out. let mut shard_senders_to_remove = Vec::new(); for (shard, states) in &mut self.shard_senders { for state in states { - if max_next_record_lsn <= state.next_record_lsn { - continue; - } - let shard_sender_id = ShardSenderId::new(*shard, state.sender_id); - let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default(); - let batch = InterpretedWalRecords { - records, - next_record_lsn: Some(max_next_record_lsn), + let batch = if max_next_record_lsn > state.next_record_lsn { + // This batch contains at least one record that this shard has not + // seen yet. + let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default(); + + InterpretedWalRecords { + records, + next_record_lsn: max_next_record_lsn, + raw_wal_start_lsn: Some(batch_wal_start_lsn), + } + } else if wal_end_lsn > state.next_record_lsn { + // All the records in this batch were seen by the shard + // However, the batch maps to a chunk of WAL that the + // shard has not yet seen. Notify it of the start LSN + // of the PG WAL chunk such that it doesn't look like a gap. + InterpretedWalRecords { + records: Vec::default(), + next_record_lsn: state.next_record_lsn, + raw_wal_start_lsn: Some(batch_wal_start_lsn), + } + } else { + // The shard has seen this chunk of WAL before. Skip it. + continue; }; let res = state.tx.send(Batch { @@ -403,7 +432,7 @@ impl InterpretedWalReader { if res.is_err() { shard_senders_to_remove.push(shard_sender_id); } else { - state.next_record_lsn = max_next_record_lsn; + state.next_record_lsn = std::cmp::max(state.next_record_lsn, max_next_record_lsn); } } } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 36af522535..1d282971b1 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1167,15 +1167,15 @@ class NeonEnv: "max_batch_size": 32, } - # Concurrent IO (https://github.com/neondatabase/neon/issues/9378): - # enable concurrent IO by default in tests and benchmarks. - # Compat tests are exempt because old versions fail to parse the new config. - get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io if config.test_may_use_compatibility_snapshot_binaries: log.info( - "Forcing use of binary-built-in default to avoid forward-compatibility related test failures" + "Skipping WAL contiguity validation to avoid forward-compatibility related test failures" ) - get_vectored_concurrent_io = None + else: + # Look for gaps in WAL received from safekeepeers + ps_cfg["validate_wal_contiguity"] = True + + get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io if get_vectored_concurrent_io is not None: ps_cfg["get_vectored_concurrent_io"] = { "mode": self.pageserver_get_vectored_concurrent_io, From bd335fa751162f7b0c534b306f3eceb6edd89c4b Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 20 Feb 2025 20:29:14 +0200 Subject: [PATCH 06/11] Fix prototype of CheckPointReplicationState (#10907) ## Problem Occasionally removed (void) from definition of `CheckPointReplicationState` function ## Summary of changes Restore function prototype. https://github.com/neondatabase/postgres/pull/585 https://github.com/neondatabase/postgres/pull/586 --------- Co-authored-by: Konstantin Knizhnik --- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/revisions.json | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 023f1020ec..6ff5044377 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 023f1020ecb07af3bb0ddbf4622e1a3c3fa276a4 +Subproject commit 6ff50443773b69749e16da6db9d4f4b19064b4b7 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 6cb8d22079..261ed10e9b 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 6cb8d22079570b50fcaff29124d40807c1e63a82 +Subproject commit 261ed10e9b8c8dda01ad7aefb18e944e30aa161d diff --git a/vendor/revisions.json b/vendor/revisions.json index 3379cf1ba8..f85cec3a0b 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -5,11 +5,11 @@ ], "v16": [ "16.8", - "6cb8d22079570b50fcaff29124d40807c1e63a82" + "261ed10e9b8c8dda01ad7aefb18e944e30aa161d" ], "v15": [ "15.12", - "023f1020ecb07af3bb0ddbf4622e1a3c3fa276a4" + "6ff50443773b69749e16da6db9d4f4b19064b4b7" ], "v14": [ "14.17", From 5b81a774fc698ea66f972af4463bfe0c5e9c8545 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 20 Feb 2025 20:16:22 +0100 Subject: [PATCH 07/11] Update rust to 1.85.0 (#10914) We keep the practice of keeping the compiler up to date, pointing to the latest release. This is done by many other projects in the Rust ecosystem as well. [Announcement blog post](https://blog.rust-lang.org/2025/02/20/Rust-1.85.0.html). Prior update was in #10618. --- build-tools.Dockerfile | 2 +- rust-toolchain.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build-tools.Dockerfile b/build-tools.Dockerfile index 317eded26e..c103ceaea5 100644 --- a/build-tools.Dockerfile +++ b/build-tools.Dockerfile @@ -292,7 +292,7 @@ WORKDIR /home/nonroot # Rust # Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`) -ENV RUSTC_VERSION=1.84.1 +ENV RUSTC_VERSION=1.85.0 ENV RUSTUP_HOME="/home/nonroot/.rustup" ENV PATH="/home/nonroot/.cargo/bin:${PATH}" ARG RUSTFILT_VERSION=0.2.1 diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 38a7f202ba..591d60ea79 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.84.1" +channel = "1.85.0" profile = "default" # The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy. # https://rust-lang.github.io/rustup/concepts/profiles.html From 3f376e44babb12e9a1c7c2f57c51628daccfed15 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 20 Feb 2025 21:48:06 +0200 Subject: [PATCH 08/11] Temporarily disable pg_duckdb (#10909) It clashed with pg_mooncake This is the same as the hotfix #10908 , but for the main branch, to keep the release and main branches in sync. In particular, we don't want to accidentally revert this temporary fix, if we cut a new release from main. --- compute/compute-node.Dockerfile | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/compute/compute-node.Dockerfile b/compute/compute-node.Dockerfile index 19633064a6..7169cbc41d 100644 --- a/compute/compute-node.Dockerfile +++ b/compute/compute-node.Dockerfile @@ -1669,7 +1669,11 @@ COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/ -COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/ + +# Disabled temporarily, because it clashed with pg_mooncake. pg_mooncake +# also depends on libduckdb, but a different version. +#COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/ + COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pgaudit-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pgauditlogtofile-build /usr/local/pgsql/ /usr/local/pgsql/ From 0b9b391ea0366b896069705b3cdfdf82a9a8e901 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 20 Feb 2025 22:21:44 +0200 Subject: [PATCH 09/11] Fix caclulation of prefetch ring position to fit in-flight request in resized ring buffer (#10899) ## Problem Refer https://github.com/neondatabase/neon/issues/10885 Wait position in ring buffer to restrict number of in-flight requests is not correctly calculated. ## Summary of changes Update condition and remove redundant assertion Co-authored-by: Konstantin Knizhnik --- pgxn/neon/pagestore_smgr.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index f1087a8ccb..6c812f347f 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -474,8 +474,7 @@ readahead_buffer_resize(int newsize, void *extra) */ if (MyPState->n_requests_inflight > newsize) { - Assert(MyPState->ring_unused >= MyPState->n_requests_inflight - newsize); - prefetch_wait_for(MyPState->ring_unused - (MyPState->n_requests_inflight - newsize)); + prefetch_wait_for(MyPState->ring_unused - newsize - 1); Assert(MyPState->n_requests_inflight <= newsize); } From 9b42d1ce1a6e0c8bba0fca397d6d17d200da3d9c Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 20 Feb 2025 22:38:42 +0100 Subject: [PATCH 10/11] pageserver: periodically log slow ongoing getpage requests (#10906) ## Problem We don't have good observability for "stuck" getpage requests. Resolves https://github.com/neondatabase/cloud/issues/23808. ## Summary of changes Log a periodic warning (every 30 seconds) if GetPage request execution is slow to complete, to aid in debugging stuck GetPage requests. This does not cover response flushing (we have separate logging for that), nor reading the request from the socket and batching it (expected to be insignificant and not straightforward to handle with the current protocol). This costs 95 nanoseconds on the happy path when awaiting a `tokio::task::yield_now()`: ``` warn_slow/enabled=false time: [45.716 ns 46.116 ns 46.687 ns] warn_slow/enabled=true time: [141.53 ns 141.83 ns 142.18 ns] ``` --- Cargo.lock | 1 + libs/utils/Cargo.toml | 3 ++- libs/utils/benches/README.md | 26 ++++++++++++++++++ libs/utils/benches/benchmarks.rs | 45 +++++++++++++++++++++++++++++--- libs/utils/src/logging.rs | 39 +++++++++++++++++++++++++++ pageserver/src/page_service.rs | 41 ++++++++++++++++++++--------- 6 files changed, 139 insertions(+), 16 deletions(-) create mode 100644 libs/utils/benches/README.md diff --git a/Cargo.lock b/Cargo.lock index 12232eaece..f62026696e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7616,6 +7616,7 @@ dependencies = [ "once_cell", "pin-project-lite", "postgres_connection", + "pprof", "pq_proto", "rand 0.8.5", "regex", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 62e0f4cfba..5020d82adf 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -27,7 +27,7 @@ humantime.workspace = true fail.workspace = true futures = { workspace = true } jsonwebtoken.workspace = true -nix = {workspace = true, features = [ "ioctl" ] } +nix = { workspace = true, features = ["ioctl"] } once_cell.workspace = true pin-project-lite.workspace = true regex.workspace = true @@ -61,6 +61,7 @@ bytes.workspace = true criterion.workspace = true hex-literal.workspace = true camino-tempfile.workspace = true +pprof.workspace = true serde_assert.workspace = true tokio = { workspace = true, features = ["test-util"] } diff --git a/libs/utils/benches/README.md b/libs/utils/benches/README.md new file mode 100644 index 0000000000..e23ec268c2 --- /dev/null +++ b/libs/utils/benches/README.md @@ -0,0 +1,26 @@ +## Utils Benchmarks + +To run benchmarks: + +```sh +# All benchmarks. +cargo bench --package utils + +# Specific file. +cargo bench --package utils --bench benchmarks + +# Specific benchmark. +cargo bench --package utils --bench benchmarks warn_slow/enabled=true + +# List available benchmarks. +cargo bench --package utils --benches -- --list + +# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds. +# Output in target/criterion/*/profile/flamegraph.svg. +cargo bench --package utils --bench benchmarks warn_slow/enabled=true --profile-time 10 +``` + +Additional charts and statistics are available in `target/criterion/report/index.html`. + +Benchmarks are automatically compared against the previous run. To compare against other runs, see +`--baseline` and `--save-baseline`. \ No newline at end of file diff --git a/libs/utils/benches/benchmarks.rs b/libs/utils/benches/benchmarks.rs index 44eb36387c..cff3792f3a 100644 --- a/libs/utils/benches/benchmarks.rs +++ b/libs/utils/benches/benchmarks.rs @@ -1,5 +1,18 @@ -use criterion::{criterion_group, criterion_main, Criterion}; +use std::time::Duration; + +use criterion::{criterion_group, criterion_main, Bencher, Criterion}; +use pprof::criterion::{Output, PProfProfiler}; use utils::id; +use utils::logging::warn_slow; + +// Register benchmarks with Criterion. +criterion_group!( + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_id_stringify, + bench_warn_slow, +); +criterion_main!(benches); pub fn bench_id_stringify(c: &mut Criterion) { // Can only use public methods. @@ -16,5 +29,31 @@ pub fn bench_id_stringify(c: &mut Criterion) { }); } -criterion_group!(benches, bench_id_stringify); -criterion_main!(benches); +pub fn bench_warn_slow(c: &mut Criterion) { + for enabled in [false, true] { + c.bench_function(&format!("warn_slow/enabled={enabled}"), |b| { + run_bench(b, enabled).unwrap() + }); + } + + // The actual benchmark. + fn run_bench(b: &mut Bencher, enabled: bool) -> anyhow::Result<()> { + const THRESHOLD: Duration = Duration::from_secs(1); + + // Use a multi-threaded runtime to avoid thread parking overhead when yielding. + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + + // Test both with and without warn_slow, since we're essentially measuring Tokio scheduling + // performance too. Use a simple noop future that yields once, to avoid any scheduler fast + // paths for a ready future. + if enabled { + b.iter(|| runtime.block_on(warn_slow("ready", THRESHOLD, tokio::task::yield_now()))); + } else { + b.iter(|| runtime.block_on(tokio::task::yield_now())); + } + + Ok(()) + } +} diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 4a6069294d..95c69ac8ba 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,9 +1,13 @@ +use std::future::Future; use std::str::FromStr; +use std::time::Duration; use anyhow::Context; use metrics::{IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use strum_macros::{EnumString, VariantNames}; +use tokio::time::Instant; +use tracing::warn; /// Logs a critical error, similarly to `tracing::error!`. This will: /// @@ -318,6 +322,41 @@ impl std::fmt::Debug for SecretString { } } +/// Logs a periodic warning if a future is slow to complete. +/// +/// This is performance-sensitive as it's used on the GetPage read path. +#[inline] +pub async fn warn_slow(name: &str, threshold: Duration, f: impl Future) -> O { + // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and + // won't fit on the stack. + let mut f = Box::pin(f); + + let started = Instant::now(); + let mut attempt = 1; + + loop { + // NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common + // case where the timeout doesn't fire. + let deadline = started + attempt * threshold; + if let Ok(output) = tokio::time::timeout_at(deadline, &mut f).await { + // NB: we check if we exceeded the threshold even if the timeout never fired, because + // scheduling or execution delays may cause the future to succeed even if it exceeds the + // timeout. This costs an extra unconditional clock reading, but seems worth it to avoid + // false negatives. + let elapsed = started.elapsed(); + if elapsed >= threshold { + warn!("slow {name} completed after {:.3}s", elapsed.as_secs_f64()); + } + return output; + } + + let elapsed = started.elapsed().as_secs_f64(); + warn!("slow {name} still running after {elapsed:.3}s",); + + attempt += 1; + } +} + #[cfg(test)] mod tests { use metrics::{core::Opts, IntCounterVec}; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 0c8da6f2a8..7285697040 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -34,11 +34,13 @@ use std::str::FromStr; use std::sync::Arc; use std::time::SystemTime; use std::time::{Duration, Instant}; +use strum_macros::IntoStaticStr; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; +use utils::logging::warn_slow; use utils::sync::gate::{Gate, GateGuard}; use utils::sync::spsc_fold; use utils::{ @@ -81,6 +83,9 @@ use std::os::fd::AsRawFd; /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`]. const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000); +/// Threshold at which to log a warning about slow GetPage requests. +const WARN_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30); + /////////////////////////////////////////////////////////////////////////////// pub struct Listener { @@ -594,6 +599,7 @@ struct BatchedTestRequest { /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum, /// so that we don't keep the [`Timeline::gate`] open while the batch /// is being built up inside the [`spsc_fold`] (pagestream pipelining). +#[derive(IntoStaticStr)] enum BatchedFeMessage { Exists { span: Span, @@ -638,6 +644,10 @@ enum BatchedFeMessage { } impl BatchedFeMessage { + fn as_static_str(&self) -> &'static str { + self.into() + } + fn observe_execution_start(&mut self, at: Instant) { match self { BatchedFeMessage::Exists { timer, .. } @@ -1463,17 +1473,20 @@ impl PageServerHandler { } }; - let err = self - .pagesteam_handle_batched_message( + let result = warn_slow( + msg.as_static_str(), + WARN_SLOW_GETPAGE_THRESHOLD, + self.pagesteam_handle_batched_message( pgb_writer, msg, io_concurrency.clone(), &cancel, protocol_version, ctx, - ) - .await; - match err { + ), + ) + .await; + match result { Ok(()) => {} Err(e) => break e, } @@ -1636,13 +1649,17 @@ impl PageServerHandler { return Err(e); } }; - self.pagesteam_handle_batched_message( - pgb_writer, - batch, - io_concurrency.clone(), - &cancel, - protocol_version, - &ctx, + warn_slow( + batch.as_static_str(), + WARN_SLOW_GETPAGE_THRESHOLD, + self.pagesteam_handle_batched_message( + pgb_writer, + batch, + io_concurrency.clone(), + &cancel, + protocol_version, + &ctx, + ), ) .await?; } From c214c32d3f7f29485226d7baf97ea6f7643769bd Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Thu, 20 Feb 2025 20:56:30 -0500 Subject: [PATCH 11/11] fix(pageserver): avoid creating empty job for gc-compaction (#10917) ## Problem This should be one last fix for https://github.com/neondatabase/neon/issues/10517. ## Summary of changes If a keyspace is empty, we might produce a gc-compaction job which covers no layer files. We should avoid generating such jobs so that the gc-compaction image layer can cover the full key range. Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline/compaction.rs | 31 +++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 58a87dbd5f..0361ce8cd1 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -2212,7 +2212,7 @@ impl Timeline { let sub_compaction_max_job_size_mb = sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB); - let mut compact_jobs = Vec::new(); + let mut compact_jobs = Vec::::new(); // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning // by estimating the amount of files read for a compaction job. We should also partition on LSN. let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone(); @@ -2299,16 +2299,25 @@ impl Timeline { } else { end }; - info!( - "splitting compaction job: {}..{}, estimated_size={}", - start, end, total_size - ); - compact_jobs.push(GcCompactJob { - dry_run: job.dry_run, - compact_key_range: start..end, - compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn, - }); - current_start = Some(end); + if total_size == 0 && !compact_jobs.is_empty() { + info!( + "splitting compaction job: {}..{}, estimated_size={}, extending the previous job", + start, end, total_size + ); + compact_jobs.last_mut().unwrap().compact_key_range.end = end; + current_start = Some(end); + } else { + info!( + "splitting compaction job: {}..{}, estimated_size={}", + start, end, total_size + ); + compact_jobs.push(GcCompactJob { + dry_run: job.dry_run, + compact_key_range: start..end, + compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn, + }); + current_start = Some(end); + } } } Ok(compact_jobs)