From ef2b50994cab9702a84aca490d70f11fb0d1036b Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 3 Mar 2025 16:20:20 +0300 Subject: [PATCH] walproposer: basic infra to enable generations (#11002) ## Problem Preparation for https://github.com/neondatabase/neon/issues/10851 ## Summary of changes Add walproposer `safekeepers_generations` field which can be set by prefixing `neon.safekeepers` GUC with `g#n:`. Non zero value (n) forces walproposer to use generations. In particular, this also disables implicit timeline creation as timeline will be created by storcon. Add test checking this. Also add missing infra: `--safekeepers-generation` flag to neon_local endpoint start + fix `--start-timeout` flag: it existed but value wasn't used. --- compute_tools/src/config.rs | 14 ++++- control_plane/src/bin/neon_local.rs | 20 +++++-- control_plane/src/endpoint.rs | 25 ++++++--- libs/compute_api/src/spec.rs | 11 ++++ pgxn/neon/walproposer.c | 69 ++++++++++++++++++++++-- pgxn/neon/walproposer.h | 19 ++++++- test_runner/fixtures/neon_cli.py | 6 +++ test_runner/fixtures/neon_fixtures.py | 8 ++- test_runner/regress/test_wal_acceptor.py | 48 +++++++++++++++++ 9 files changed, 200 insertions(+), 20 deletions(-) diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index e8056ec7eb..ca24ff76b3 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -1,5 +1,7 @@ +use std::fmt::Write as FmtWrite; use std::fs::{File, OpenOptions}; use std::io; +use std::io::Write; use std::io::prelude::*; use std::path::Path; @@ -55,10 +57,20 @@ pub fn write_postgres_conf( writeln!(file, "neon.stripe_size={stripe_size}")?; } if !spec.safekeeper_connstrings.is_empty() { + let mut neon_safekeepers_value = String::new(); + tracing::info!( + "safekeepers_connstrings is not zero, gen: {:?}", + spec.safekeepers_generation + ); + // If generation is given, prepend sk list with g#number: + if let Some(generation) = spec.safekeepers_generation { + write!(neon_safekeepers_value, "g#{}:", generation)?; + } + neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(",")); writeln!( file, "neon.safekeepers={}", - escape_conf_value(&spec.safekeeper_connstrings.join(",")) + escape_conf_value(&neon_safekeepers_value) )?; } if let Some(s) = &spec.tenant_id { diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index f258025428..375b5d87d0 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -40,6 +40,7 @@ use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInf use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId}; use postgres_backend::AuthType; use postgres_connection::parse_host_port; +use safekeeper_api::membership::SafekeeperGeneration; use safekeeper_api::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, @@ -596,7 +597,15 @@ struct EndpointStartCmdArgs { #[clap(long = "pageserver-id")] endpoint_pageserver_id: Option, - #[clap(long)] + #[clap( + long, + help = "Safekeepers membership generation to prefix neon.safekeepers with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations." + )] + safekeepers_generation: Option, + #[clap( + long, + help = "List of safekeepers endpoint will talk to. Normally neon_local chooses them on its own, but this option allows to override." + )] safekeepers: Option, #[clap( @@ -617,9 +626,9 @@ struct EndpointStartCmdArgs { )] allow_multiple: bool, - #[clap(short = 't', long, help = "timeout until we fail the command")] - #[arg(default_value = "10s")] - start_timeout: humantime::Duration, + #[clap(short = 't', long, value_parser= humantime::parse_duration, help = "timeout until we fail the command")] + #[arg(default_value = "90s")] + start_timeout: Duration, } #[derive(clap::Args)] @@ -1350,6 +1359,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res let pageserver_id = args.endpoint_pageserver_id; let remote_ext_config = &args.remote_ext_config; + let safekeepers_generation = args.safekeepers_generation.map(SafekeeperGeneration::new); // If --safekeepers argument is given, use only the listed // safekeeper nodes; otherwise all from the env. let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? { @@ -1425,11 +1435,13 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res endpoint .start( &auth_token, + safekeepers_generation, safekeepers, pageservers, remote_ext_config.as_ref(), stripe_size.0 as usize, args.create_test_user, + args.start_timeout, ) .await?; } diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 50ccca36fe..87bfbd7570 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -42,7 +42,7 @@ use std::path::PathBuf; use std::process::Command; use std::str::FromStr; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result, anyhow, bail}; use compute_api::requests::ConfigurationRequest; @@ -53,6 +53,7 @@ use compute_api::spec::{ use nix::sys::signal::{Signal, kill}; use pageserver_api::shard::ShardStripeSize; use reqwest::header::CONTENT_TYPE; +use safekeeper_api::membership::SafekeeperGeneration; use serde::{Deserialize, Serialize}; use tracing::debug; use url::Host; @@ -576,14 +577,17 @@ impl Endpoint { Ok(safekeeper_connstrings) } + #[allow(clippy::too_many_arguments)] pub async fn start( &self, auth_token: &Option, + safekeepers_generation: Option, safekeepers: Vec, pageservers: Vec<(Host, u16)>, remote_ext_config: Option<&String>, shard_stripe_size: usize, create_test_user: bool, + start_timeout: Duration, ) -> Result<()> { if self.status() == EndpointStatus::Running { anyhow::bail!("The endpoint is already running"); @@ -655,6 +659,7 @@ impl Endpoint { timeline_id: Some(self.timeline_id), mode: self.mode, pageserver_connstring: Some(pageserver_connstring), + safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()), safekeeper_connstrings, storage_auth_token: auth_token.clone(), remote_extensions, @@ -770,17 +775,18 @@ impl Endpoint { std::fs::write(pidfile_path, pid.to_string())?; // Wait for it to start - let mut attempt = 0; const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100); - const MAX_ATTEMPTS: u32 = 10 * 90; // Wait up to 1.5 min + let start_at = Instant::now(); loop { - attempt += 1; match self.get_status().await { Ok(state) => { match state.status { ComputeStatus::Init => { - if attempt == MAX_ATTEMPTS { - bail!("compute startup timed out; still in Init state"); + if Instant::now().duration_since(start_at) > start_timeout { + bail!( + "compute startup timed out {:?}; still in Init state", + start_timeout + ); } // keep retrying } @@ -807,8 +813,11 @@ impl Endpoint { } } Err(e) => { - if attempt == MAX_ATTEMPTS { - return Err(e).context("timed out waiting to connect to compute_ctl HTTP"); + if Instant::now().duration_since(start_at) > start_timeout { + return Err(e).context(format!( + "timed out {:?} waiting to connect to compute_ctl HTTP", + start_timeout, + )); } } } diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index d02bfd6814..df82d8b449 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -101,6 +101,17 @@ pub struct ComputeSpec { pub timeline_id: Option, pub pageserver_connstring: Option, + /// Safekeeper membership config generation. It is put in + /// neon.safekeepers GUC and serves two purposes: + /// 1) Non zero value forces walproposer to use membership configurations. + /// 2) If walproposer wants to update list of safekeepers to connect to + /// taking them from some safekeeper mconf, it should check what value + /// is newer by comparing the generation. + /// + /// Note: it could be SafekeeperGeneration, but this needs linking + /// compute_ctl with postgres_ffi. + #[serde(default)] + pub safekeepers_generation: Option, #[serde(default)] pub safekeeper_connstrings: Vec, diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 356895aa82..7ec4ec99fc 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -83,6 +83,7 @@ static void AssertEventsOkForState(uint32 events, Safekeeper *sk); static char *FormatEvents(WalProposer *wp, uint32 events); static void UpdateDonorShmem(WalProposer *wp); static char *MembershipConfigurationToString(MembershipConfiguration *mconf); +static void MembershipConfigurationCopy(MembershipConfiguration *src, MembershipConfiguration *dst); static void MembershipConfigurationFree(MembershipConfiguration *mconf); WalProposer * @@ -97,7 +98,32 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) wp->config = config; wp->api = api; - for (host = wp->config->safekeepers_list; host != NULL && *host != '\0'; host = sep) + wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list); + + /* + * If safekeepers list starts with g# parse generation number followed by + * : + */ + if (strncmp(wp->config->safekeepers_list, "g#", 2) == 0) + { + char *endptr; + + errno = 0; + wp->safekeepers_generation = strtoul(wp->config->safekeepers_list + 2, &endptr, 10); + if (errno != 0) + { + wp_log(FATAL, "failed to parse neon.safekeepers generation number: %m"); + } + /* Skip past : to the first hostname. */ + host = endptr + 1; + } + else + { + host = wp->config->safekeepers_list; + } + wp_log(LOG, "safekeepers_generation=%u", wp->safekeepers_generation); + + for (; host != NULL && *host != '\0'; host = sep) { port = strchr(host, ':'); if (port == NULL) @@ -183,6 +209,12 @@ WalProposerFree(WalProposer *wp) pfree(wp); } +static bool +WalProposerGenerationsEnabled(WalProposer *wp) +{ + return wp->safekeepers_generation != 0; +} + /* * Create new AppendRequest message and start sending it. This function is * called from walsender every time the new WAL is available. @@ -600,10 +632,14 @@ static void SendStartWALPush(Safekeeper *sk) { WalProposer *wp = sk->wp; + + /* Forbid implicit timeline creation if generations are enabled. */ + char *allow_timeline_creation = WalProposerGenerationsEnabled(wp) ? "false" : "true"; #define CMD_LEN 512 char cmd[CMD_LEN]; - snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d')", wp->config->proto_version); + + snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d', allow_timeline_creation '%s')", wp->config->proto_version, allow_timeline_creation); if (!wp->api.conn_send_query(sk, cmd)) { wp_log(WARNING, "failed to send '%s' query to safekeeper %s:%s: %s", @@ -705,6 +741,18 @@ RecvAcceptorGreeting(Safekeeper *sk) sk->host, sk->port, sk->greetResponse.nodeId, mconf_toml, sk->greetResponse.term); pfree(mconf_toml); + /* + * Adopt mconf of safekeepers if it is higher. TODO: mconf change should + * restart wp if it started voting. + */ + if (sk->greetResponse.mconf.generation > wp->mconf.generation) + { + MembershipConfigurationFree(&wp->mconf); + MembershipConfigurationCopy(&sk->greetResponse.mconf, &wp->mconf); + /* full conf was just logged above */ + wp_log(LOG, "changed mconf to generation %u", wp->mconf.generation); + } + /* Protocol is all good, move to voting. */ sk->state = SS_VOTING; @@ -1896,7 +1944,8 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf pq_sendint64_le(buf, m->termHistory->entries[i].term); pq_sendint64_le(buf, m->termHistory->entries[i].lsn); } - /* + + /* * Removed timeline_start_lsn. Still send it as a valid * value until safekeepers taking it from term history are * deployed. @@ -2162,7 +2211,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) } } wp_log(FATAL, "unsupported proto_version %d", wp->config->proto_version); - return false; /* keep the compiler quiet */ + return false; /* keep the compiler quiet */ } /* @@ -2570,6 +2619,18 @@ MembershipConfigurationToString(MembershipConfiguration *mconf) return s.data; } +static void +MembershipConfigurationCopy(MembershipConfiguration *src, MembershipConfiguration *dst) +{ + dst->generation = src->generation; + dst->members.len = src->members.len; + dst->members.m = palloc0(sizeof(SafekeeperId) * dst->members.len); + memcpy(dst->members.m, src->members.m, sizeof(SafekeeperId) * dst->members.len); + dst->new_members.len = src->new_members.len; + dst->new_members.m = palloc0(sizeof(SafekeeperId) * dst->new_members.len); + memcpy(dst->new_members.m, src->new_members.m, sizeof(SafekeeperId) * dst->new_members.len); +} + static void MembershipConfigurationFree(MembershipConfiguration *mconf) { diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index eee55f924f..8d1ae26cac 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -160,7 +160,10 @@ typedef struct MemberSet SafekeeperId *m; /* ids themselves */ } MemberSet; -/* Timeline safekeeper membership configuration. */ +/* + * Timeline safekeeper membership configuration as sent in the + * protocol. + */ typedef struct MembershipConfiguration { Generation generation; @@ -761,8 +764,22 @@ typedef struct WalProposer /* (n_safekeepers / 2) + 1 */ int quorum; + /* + * Generation of the membership conf of which safekeepers[] are presumably + * members. To make cplane life a bit easier and have more control in + * tests with which sks walproposer gets connected neon.safekeepers GUC + * doesn't provide full mconf, only the list of endpoints to connect to. + * We still would like to know generation associated with it because 1) we + * need some handle to enforce using generations in walproposer, and + * non-zero value of this serves the purpose; 2) currently we don't do + * that, but in theory walproposer can update list of safekeepers to + * connect to upon receiving mconf from safekeepers, and generation number + * must be checked to see which list is newer. + */ + Generation safekeepers_generation; /* Number of occupied slots in safekeepers[] */ int n_safekeepers; + /* Safekeepers walproposer is connecting to. */ Safekeeper safekeeper[MAX_SAFEKEEPERS]; /* WAL has been generated up to this point */ diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index 97a5a36814..6e53987e7c 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -525,12 +525,14 @@ class NeonLocalCli(AbstractNeonCli): def endpoint_start( self, endpoint_id: str, + safekeepers_generation: int | None = None, safekeepers: list[int] | None = None, remote_ext_config: str | None = None, pageserver_id: int | None = None, allow_multiple: bool = False, create_test_user: bool = False, basebackup_request_tries: int | None = None, + timeout: str | None = None, env: dict[str, str] | None = None, ) -> subprocess.CompletedProcess[str]: args = [ @@ -543,6 +545,8 @@ class NeonLocalCli(AbstractNeonCli): if remote_ext_config is not None: args.extend(["--remote-ext-config", remote_ext_config]) + if safekeepers_generation is not None: + args.extend(["--safekeepers-generation", str(safekeepers_generation)]) if safekeepers is not None: args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) if endpoint_id is not None: @@ -553,6 +557,8 @@ class NeonLocalCli(AbstractNeonCli): args.extend(["--allow-multiple"]) if create_test_user: args.extend(["--create-test-user"]) + if timeout is not None: + args.extend(["--start-timeout", str(timeout)]) res = self.raw_cli(args, extra_env_vars) res.check_returncode() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 6001003e53..53df10be49 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4005,10 +4005,12 @@ class Endpoint(PgProtocol, LogUtils): self, remote_ext_config: str | None = None, pageserver_id: int | None = None, + safekeeper_generation: int | None = None, safekeepers: list[int] | None = None, allow_multiple: bool = False, create_test_user: bool = False, basebackup_request_tries: int | None = None, + timeout: str | None = None, env: dict[str, str] | None = None, ) -> Self: """ @@ -4018,19 +4020,21 @@ class Endpoint(PgProtocol, LogUtils): assert self.endpoint_id is not None - # If `safekeepers` is not None, they are remember them as active and use - # in the following commands. + # If `safekeepers` is not None, remember them as active and use in the + # following commands. if safekeepers is not None: self.active_safekeepers = safekeepers self.env.neon_cli.endpoint_start( self.endpoint_id, + safekeepers_generation=safekeeper_generation, safekeepers=self.active_safekeepers, remote_ext_config=remote_ext_config, pageserver_id=pageserver_id, allow_multiple=allow_multiple, create_test_user=create_test_user, basebackup_request_tries=basebackup_request_tries, + timeout=timeout, env=env, ) self._running.release(1) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 0a05189bfb..8f70b460c6 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -2281,6 +2281,54 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder): http_cli.timeline_status(tenant_id, timeline_id) +def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder): + """ + Test that having neon.safekeepers starting with g#n: with non zero n enables + generations, which as a side effect disables automatic timeline creation. + + This is kind of bootstrapping test: here membership conf & timeline is + created manually, later storcon will do that. + """ + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + ps = env.pageservers[0] + ps_http_cli = ps.http_client() + + http_clis = [sk.http_client() for sk in env.safekeepers] + + config_lines = [ + "neon.safekeeper_proto_version = 3", + ] + ep = env.endpoints.create("main", config_lines=config_lines) + + # expected to fail because timeline is not created on safekeepers + with pytest.raises(Exception, match=r".*timed out.*"): + ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3], timeout="2s") + # figure out initial LSN. + ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id) + init_lsn = ps_timeline_detail["last_record_lsn"] + log.info(f"initial LSN: {init_lsn}") + # sk timeline creation request expects minor version + pg_version = ps_timeline_detail["pg_version"] * 10000 + # create inital mconf + sk_ids = [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in env.safekeepers] + mconf = Configuration(generation=1, members=sk_ids, new_members=None) + create_r = TimelineCreateRequest( + tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None + ) + log.info(f"sending timeline create: {create_r.to_json()}") + + for sk_http_cli in http_clis: + sk_http_cli.timeline_create(create_r) + # Once timeline created endpoint should start. + ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) + ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)") + + # In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries # when compute is active, but there are no writes to the timeline. In that case # pageserver should maintain a single connection to safekeeper and don't attempt