diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index e8056ec7eb..ca24ff76b3 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -1,5 +1,7 @@ +use std::fmt::Write as FmtWrite; use std::fs::{File, OpenOptions}; use std::io; +use std::io::Write; use std::io::prelude::*; use std::path::Path; @@ -55,10 +57,20 @@ pub fn write_postgres_conf( writeln!(file, "neon.stripe_size={stripe_size}")?; } if !spec.safekeeper_connstrings.is_empty() { + let mut neon_safekeepers_value = String::new(); + tracing::info!( + "safekeepers_connstrings is not zero, gen: {:?}", + spec.safekeepers_generation + ); + // If generation is given, prepend sk list with g#number: + if let Some(generation) = spec.safekeepers_generation { + write!(neon_safekeepers_value, "g#{}:", generation)?; + } + neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(",")); writeln!( file, "neon.safekeepers={}", - escape_conf_value(&spec.safekeeper_connstrings.join(",")) + escape_conf_value(&neon_safekeepers_value) )?; } if let Some(s) = &spec.tenant_id { diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index f258025428..375b5d87d0 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -40,6 +40,7 @@ use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInf use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId}; use postgres_backend::AuthType; use postgres_connection::parse_host_port; +use safekeeper_api::membership::SafekeeperGeneration; use safekeeper_api::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, @@ -596,7 +597,15 @@ struct EndpointStartCmdArgs { #[clap(long = "pageserver-id")] endpoint_pageserver_id: Option, - #[clap(long)] + #[clap( + long, + help = "Safekeepers membership generation to prefix neon.safekeepers with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations." + )] + safekeepers_generation: Option, + #[clap( + long, + help = "List of safekeepers endpoint will talk to. Normally neon_local chooses them on its own, but this option allows to override." + )] safekeepers: Option, #[clap( @@ -617,9 +626,9 @@ struct EndpointStartCmdArgs { )] allow_multiple: bool, - #[clap(short = 't', long, help = "timeout until we fail the command")] - #[arg(default_value = "10s")] - start_timeout: humantime::Duration, + #[clap(short = 't', long, value_parser= humantime::parse_duration, help = "timeout until we fail the command")] + #[arg(default_value = "90s")] + start_timeout: Duration, } #[derive(clap::Args)] @@ -1350,6 +1359,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res let pageserver_id = args.endpoint_pageserver_id; let remote_ext_config = &args.remote_ext_config; + let safekeepers_generation = args.safekeepers_generation.map(SafekeeperGeneration::new); // If --safekeepers argument is given, use only the listed // safekeeper nodes; otherwise all from the env. let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? { @@ -1425,11 +1435,13 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res endpoint .start( &auth_token, + safekeepers_generation, safekeepers, pageservers, remote_ext_config.as_ref(), stripe_size.0 as usize, args.create_test_user, + args.start_timeout, ) .await?; } diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 50ccca36fe..87bfbd7570 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -42,7 +42,7 @@ use std::path::PathBuf; use std::process::Command; use std::str::FromStr; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result, anyhow, bail}; use compute_api::requests::ConfigurationRequest; @@ -53,6 +53,7 @@ use compute_api::spec::{ use nix::sys::signal::{Signal, kill}; use pageserver_api::shard::ShardStripeSize; use reqwest::header::CONTENT_TYPE; +use safekeeper_api::membership::SafekeeperGeneration; use serde::{Deserialize, Serialize}; use tracing::debug; use url::Host; @@ -576,14 +577,17 @@ impl Endpoint { Ok(safekeeper_connstrings) } + #[allow(clippy::too_many_arguments)] pub async fn start( &self, auth_token: &Option, + safekeepers_generation: Option, safekeepers: Vec, pageservers: Vec<(Host, u16)>, remote_ext_config: Option<&String>, shard_stripe_size: usize, create_test_user: bool, + start_timeout: Duration, ) -> Result<()> { if self.status() == EndpointStatus::Running { anyhow::bail!("The endpoint is already running"); @@ -655,6 +659,7 @@ impl Endpoint { timeline_id: Some(self.timeline_id), mode: self.mode, pageserver_connstring: Some(pageserver_connstring), + safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()), safekeeper_connstrings, storage_auth_token: auth_token.clone(), remote_extensions, @@ -770,17 +775,18 @@ impl Endpoint { std::fs::write(pidfile_path, pid.to_string())?; // Wait for it to start - let mut attempt = 0; const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100); - const MAX_ATTEMPTS: u32 = 10 * 90; // Wait up to 1.5 min + let start_at = Instant::now(); loop { - attempt += 1; match self.get_status().await { Ok(state) => { match state.status { ComputeStatus::Init => { - if attempt == MAX_ATTEMPTS { - bail!("compute startup timed out; still in Init state"); + if Instant::now().duration_since(start_at) > start_timeout { + bail!( + "compute startup timed out {:?}; still in Init state", + start_timeout + ); } // keep retrying } @@ -807,8 +813,11 @@ impl Endpoint { } } Err(e) => { - if attempt == MAX_ATTEMPTS { - return Err(e).context("timed out waiting to connect to compute_ctl HTTP"); + if Instant::now().duration_since(start_at) > start_timeout { + return Err(e).context(format!( + "timed out {:?} waiting to connect to compute_ctl HTTP", + start_timeout, + )); } } } diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index d02bfd6814..df82d8b449 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -101,6 +101,17 @@ pub struct ComputeSpec { pub timeline_id: Option, pub pageserver_connstring: Option, + /// Safekeeper membership config generation. It is put in + /// neon.safekeepers GUC and serves two purposes: + /// 1) Non zero value forces walproposer to use membership configurations. + /// 2) If walproposer wants to update list of safekeepers to connect to + /// taking them from some safekeeper mconf, it should check what value + /// is newer by comparing the generation. + /// + /// Note: it could be SafekeeperGeneration, but this needs linking + /// compute_ctl with postgres_ffi. + #[serde(default)] + pub safekeepers_generation: Option, #[serde(default)] pub safekeeper_connstrings: Vec, diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 356895aa82..7ec4ec99fc 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -83,6 +83,7 @@ static void AssertEventsOkForState(uint32 events, Safekeeper *sk); static char *FormatEvents(WalProposer *wp, uint32 events); static void UpdateDonorShmem(WalProposer *wp); static char *MembershipConfigurationToString(MembershipConfiguration *mconf); +static void MembershipConfigurationCopy(MembershipConfiguration *src, MembershipConfiguration *dst); static void MembershipConfigurationFree(MembershipConfiguration *mconf); WalProposer * @@ -97,7 +98,32 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) wp->config = config; wp->api = api; - for (host = wp->config->safekeepers_list; host != NULL && *host != '\0'; host = sep) + wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list); + + /* + * If safekeepers list starts with g# parse generation number followed by + * : + */ + if (strncmp(wp->config->safekeepers_list, "g#", 2) == 0) + { + char *endptr; + + errno = 0; + wp->safekeepers_generation = strtoul(wp->config->safekeepers_list + 2, &endptr, 10); + if (errno != 0) + { + wp_log(FATAL, "failed to parse neon.safekeepers generation number: %m"); + } + /* Skip past : to the first hostname. */ + host = endptr + 1; + } + else + { + host = wp->config->safekeepers_list; + } + wp_log(LOG, "safekeepers_generation=%u", wp->safekeepers_generation); + + for (; host != NULL && *host != '\0'; host = sep) { port = strchr(host, ':'); if (port == NULL) @@ -183,6 +209,12 @@ WalProposerFree(WalProposer *wp) pfree(wp); } +static bool +WalProposerGenerationsEnabled(WalProposer *wp) +{ + return wp->safekeepers_generation != 0; +} + /* * Create new AppendRequest message and start sending it. This function is * called from walsender every time the new WAL is available. @@ -600,10 +632,14 @@ static void SendStartWALPush(Safekeeper *sk) { WalProposer *wp = sk->wp; + + /* Forbid implicit timeline creation if generations are enabled. */ + char *allow_timeline_creation = WalProposerGenerationsEnabled(wp) ? "false" : "true"; #define CMD_LEN 512 char cmd[CMD_LEN]; - snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d')", wp->config->proto_version); + + snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d', allow_timeline_creation '%s')", wp->config->proto_version, allow_timeline_creation); if (!wp->api.conn_send_query(sk, cmd)) { wp_log(WARNING, "failed to send '%s' query to safekeeper %s:%s: %s", @@ -705,6 +741,18 @@ RecvAcceptorGreeting(Safekeeper *sk) sk->host, sk->port, sk->greetResponse.nodeId, mconf_toml, sk->greetResponse.term); pfree(mconf_toml); + /* + * Adopt mconf of safekeepers if it is higher. TODO: mconf change should + * restart wp if it started voting. + */ + if (sk->greetResponse.mconf.generation > wp->mconf.generation) + { + MembershipConfigurationFree(&wp->mconf); + MembershipConfigurationCopy(&sk->greetResponse.mconf, &wp->mconf); + /* full conf was just logged above */ + wp_log(LOG, "changed mconf to generation %u", wp->mconf.generation); + } + /* Protocol is all good, move to voting. */ sk->state = SS_VOTING; @@ -1896,7 +1944,8 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf pq_sendint64_le(buf, m->termHistory->entries[i].term); pq_sendint64_le(buf, m->termHistory->entries[i].lsn); } - /* + + /* * Removed timeline_start_lsn. Still send it as a valid * value until safekeepers taking it from term history are * deployed. @@ -2162,7 +2211,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) } } wp_log(FATAL, "unsupported proto_version %d", wp->config->proto_version); - return false; /* keep the compiler quiet */ + return false; /* keep the compiler quiet */ } /* @@ -2570,6 +2619,18 @@ MembershipConfigurationToString(MembershipConfiguration *mconf) return s.data; } +static void +MembershipConfigurationCopy(MembershipConfiguration *src, MembershipConfiguration *dst) +{ + dst->generation = src->generation; + dst->members.len = src->members.len; + dst->members.m = palloc0(sizeof(SafekeeperId) * dst->members.len); + memcpy(dst->members.m, src->members.m, sizeof(SafekeeperId) * dst->members.len); + dst->new_members.len = src->new_members.len; + dst->new_members.m = palloc0(sizeof(SafekeeperId) * dst->new_members.len); + memcpy(dst->new_members.m, src->new_members.m, sizeof(SafekeeperId) * dst->new_members.len); +} + static void MembershipConfigurationFree(MembershipConfiguration *mconf) { diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index eee55f924f..8d1ae26cac 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -160,7 +160,10 @@ typedef struct MemberSet SafekeeperId *m; /* ids themselves */ } MemberSet; -/* Timeline safekeeper membership configuration. */ +/* + * Timeline safekeeper membership configuration as sent in the + * protocol. + */ typedef struct MembershipConfiguration { Generation generation; @@ -761,8 +764,22 @@ typedef struct WalProposer /* (n_safekeepers / 2) + 1 */ int quorum; + /* + * Generation of the membership conf of which safekeepers[] are presumably + * members. To make cplane life a bit easier and have more control in + * tests with which sks walproposer gets connected neon.safekeepers GUC + * doesn't provide full mconf, only the list of endpoints to connect to. + * We still would like to know generation associated with it because 1) we + * need some handle to enforce using generations in walproposer, and + * non-zero value of this serves the purpose; 2) currently we don't do + * that, but in theory walproposer can update list of safekeepers to + * connect to upon receiving mconf from safekeepers, and generation number + * must be checked to see which list is newer. + */ + Generation safekeepers_generation; /* Number of occupied slots in safekeepers[] */ int n_safekeepers; + /* Safekeepers walproposer is connecting to. */ Safekeeper safekeeper[MAX_SAFEKEEPERS]; /* WAL has been generated up to this point */ diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index 97a5a36814..6e53987e7c 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -525,12 +525,14 @@ class NeonLocalCli(AbstractNeonCli): def endpoint_start( self, endpoint_id: str, + safekeepers_generation: int | None = None, safekeepers: list[int] | None = None, remote_ext_config: str | None = None, pageserver_id: int | None = None, allow_multiple: bool = False, create_test_user: bool = False, basebackup_request_tries: int | None = None, + timeout: str | None = None, env: dict[str, str] | None = None, ) -> subprocess.CompletedProcess[str]: args = [ @@ -543,6 +545,8 @@ class NeonLocalCli(AbstractNeonCli): if remote_ext_config is not None: args.extend(["--remote-ext-config", remote_ext_config]) + if safekeepers_generation is not None: + args.extend(["--safekeepers-generation", str(safekeepers_generation)]) if safekeepers is not None: args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) if endpoint_id is not None: @@ -553,6 +557,8 @@ class NeonLocalCli(AbstractNeonCli): args.extend(["--allow-multiple"]) if create_test_user: args.extend(["--create-test-user"]) + if timeout is not None: + args.extend(["--start-timeout", str(timeout)]) res = self.raw_cli(args, extra_env_vars) res.check_returncode() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 6001003e53..53df10be49 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4005,10 +4005,12 @@ class Endpoint(PgProtocol, LogUtils): self, remote_ext_config: str | None = None, pageserver_id: int | None = None, + safekeeper_generation: int | None = None, safekeepers: list[int] | None = None, allow_multiple: bool = False, create_test_user: bool = False, basebackup_request_tries: int | None = None, + timeout: str | None = None, env: dict[str, str] | None = None, ) -> Self: """ @@ -4018,19 +4020,21 @@ class Endpoint(PgProtocol, LogUtils): assert self.endpoint_id is not None - # If `safekeepers` is not None, they are remember them as active and use - # in the following commands. + # If `safekeepers` is not None, remember them as active and use in the + # following commands. if safekeepers is not None: self.active_safekeepers = safekeepers self.env.neon_cli.endpoint_start( self.endpoint_id, + safekeepers_generation=safekeeper_generation, safekeepers=self.active_safekeepers, remote_ext_config=remote_ext_config, pageserver_id=pageserver_id, allow_multiple=allow_multiple, create_test_user=create_test_user, basebackup_request_tries=basebackup_request_tries, + timeout=timeout, env=env, ) self._running.release(1) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 0a05189bfb..8f70b460c6 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -2281,6 +2281,54 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder): http_cli.timeline_status(tenant_id, timeline_id) +def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder): + """ + Test that having neon.safekeepers starting with g#n: with non zero n enables + generations, which as a side effect disables automatic timeline creation. + + This is kind of bootstrapping test: here membership conf & timeline is + created manually, later storcon will do that. + """ + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + ps = env.pageservers[0] + ps_http_cli = ps.http_client() + + http_clis = [sk.http_client() for sk in env.safekeepers] + + config_lines = [ + "neon.safekeeper_proto_version = 3", + ] + ep = env.endpoints.create("main", config_lines=config_lines) + + # expected to fail because timeline is not created on safekeepers + with pytest.raises(Exception, match=r".*timed out.*"): + ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3], timeout="2s") + # figure out initial LSN. + ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id) + init_lsn = ps_timeline_detail["last_record_lsn"] + log.info(f"initial LSN: {init_lsn}") + # sk timeline creation request expects minor version + pg_version = ps_timeline_detail["pg_version"] * 10000 + # create inital mconf + sk_ids = [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in env.safekeepers] + mconf = Configuration(generation=1, members=sk_ids, new_members=None) + create_r = TimelineCreateRequest( + tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None + ) + log.info(f"sending timeline create: {create_r.to_json()}") + + for sk_http_cli in http_clis: + sk_http_cli.timeline_create(create_r) + # Once timeline created endpoint should start. + ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) + ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)") + + # In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries # when compute is active, but there are no writes to the timeline. In that case # pageserver should maintain a single connection to safekeeper and don't attempt