walproposer: basic infra to enable generations (#11002)

## Problem

Preparation for https://github.com/neondatabase/neon/issues/10851

## Summary of changes

Add walproposer `safekeepers_generations` field which can be set by
prefixing `neon.safekeepers` GUC with `g#n:`. Non zero value (n) forces
walproposer to use generations. In particular, this also disables
implicit timeline creation as timeline will be created by storcon. Add
test checking this. Also add missing infra: `--safekeepers-generation`
flag to neon_local endpoint start + fix `--start-timeout` flag: it
existed but value wasn't used.
This commit is contained in:
Arseny Sher
2025-03-03 16:20:20 +03:00
committed by GitHub
parent 8669bfe493
commit ef2b50994c
9 changed files with 200 additions and 20 deletions

View File

@@ -1,5 +1,7 @@
use std::fmt::Write as FmtWrite;
use std::fs::{File, OpenOptions};
use std::io;
use std::io::Write;
use std::io::prelude::*;
use std::path::Path;
@@ -55,10 +57,20 @@ pub fn write_postgres_conf(
writeln!(file, "neon.stripe_size={stripe_size}")?;
}
if !spec.safekeeper_connstrings.is_empty() {
let mut neon_safekeepers_value = String::new();
tracing::info!(
"safekeepers_connstrings is not zero, gen: {:?}",
spec.safekeepers_generation
);
// If generation is given, prepend sk list with g#number:
if let Some(generation) = spec.safekeepers_generation {
write!(neon_safekeepers_value, "g#{}:", generation)?;
}
neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(","));
writeln!(
file,
"neon.safekeepers={}",
escape_conf_value(&spec.safekeeper_connstrings.join(","))
escape_conf_value(&neon_safekeepers_value)
)?;
}
if let Some(s) = &spec.tenant_id {

View File

@@ -40,6 +40,7 @@ use pageserver_api::models::{ShardParameters, TimelineCreateRequest, TimelineInf
use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId};
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;
use safekeeper_api::membership::SafekeeperGeneration;
use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
@@ -596,7 +597,15 @@ struct EndpointStartCmdArgs {
#[clap(long = "pageserver-id")]
endpoint_pageserver_id: Option<NodeId>,
#[clap(long)]
#[clap(
long,
help = "Safekeepers membership generation to prefix neon.safekeepers with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations."
)]
safekeepers_generation: Option<u32>,
#[clap(
long,
help = "List of safekeepers endpoint will talk to. Normally neon_local chooses them on its own, but this option allows to override."
)]
safekeepers: Option<String>,
#[clap(
@@ -617,9 +626,9 @@ struct EndpointStartCmdArgs {
)]
allow_multiple: bool,
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
#[clap(short = 't', long, value_parser= humantime::parse_duration, help = "timeout until we fail the command")]
#[arg(default_value = "90s")]
start_timeout: Duration,
}
#[derive(clap::Args)]
@@ -1350,6 +1359,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
let pageserver_id = args.endpoint_pageserver_id;
let remote_ext_config = &args.remote_ext_config;
let safekeepers_generation = args.safekeepers_generation.map(SafekeeperGeneration::new);
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {
@@ -1425,11 +1435,13 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
endpoint
.start(
&auth_token,
safekeepers_generation,
safekeepers,
pageservers,
remote_ext_config.as_ref(),
stripe_size.0 as usize,
args.create_test_user,
args.start_timeout,
)
.await?;
}

View File

@@ -42,7 +42,7 @@ use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result, anyhow, bail};
use compute_api::requests::ConfigurationRequest;
@@ -53,6 +53,7 @@ use compute_api::spec::{
use nix::sys::signal::{Signal, kill};
use pageserver_api::shard::ShardStripeSize;
use reqwest::header::CONTENT_TYPE;
use safekeeper_api::membership::SafekeeperGeneration;
use serde::{Deserialize, Serialize};
use tracing::debug;
use url::Host;
@@ -576,14 +577,17 @@ impl Endpoint {
Ok(safekeeper_connstrings)
}
#[allow(clippy::too_many_arguments)]
pub async fn start(
&self,
auth_token: &Option<String>,
safekeepers_generation: Option<SafekeeperGeneration>,
safekeepers: Vec<NodeId>,
pageservers: Vec<(Host, u16)>,
remote_ext_config: Option<&String>,
shard_stripe_size: usize,
create_test_user: bool,
start_timeout: Duration,
) -> Result<()> {
if self.status() == EndpointStatus::Running {
anyhow::bail!("The endpoint is already running");
@@ -655,6 +659,7 @@ impl Endpoint {
timeline_id: Some(self.timeline_id),
mode: self.mode,
pageserver_connstring: Some(pageserver_connstring),
safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
safekeeper_connstrings,
storage_auth_token: auth_token.clone(),
remote_extensions,
@@ -770,17 +775,18 @@ impl Endpoint {
std::fs::write(pidfile_path, pid.to_string())?;
// Wait for it to start
let mut attempt = 0;
const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100);
const MAX_ATTEMPTS: u32 = 10 * 90; // Wait up to 1.5 min
let start_at = Instant::now();
loop {
attempt += 1;
match self.get_status().await {
Ok(state) => {
match state.status {
ComputeStatus::Init => {
if attempt == MAX_ATTEMPTS {
bail!("compute startup timed out; still in Init state");
if Instant::now().duration_since(start_at) > start_timeout {
bail!(
"compute startup timed out {:?}; still in Init state",
start_timeout
);
}
// keep retrying
}
@@ -807,8 +813,11 @@ impl Endpoint {
}
}
Err(e) => {
if attempt == MAX_ATTEMPTS {
return Err(e).context("timed out waiting to connect to compute_ctl HTTP");
if Instant::now().duration_since(start_at) > start_timeout {
return Err(e).context(format!(
"timed out {:?} waiting to connect to compute_ctl HTTP",
start_timeout,
));
}
}
}

View File

@@ -101,6 +101,17 @@ pub struct ComputeSpec {
pub timeline_id: Option<TimelineId>,
pub pageserver_connstring: Option<String>,
/// Safekeeper membership config generation. It is put in
/// neon.safekeepers GUC and serves two purposes:
/// 1) Non zero value forces walproposer to use membership configurations.
/// 2) If walproposer wants to update list of safekeepers to connect to
/// taking them from some safekeeper mconf, it should check what value
/// is newer by comparing the generation.
///
/// Note: it could be SafekeeperGeneration, but this needs linking
/// compute_ctl with postgres_ffi.
#[serde(default)]
pub safekeepers_generation: Option<u32>,
#[serde(default)]
pub safekeeper_connstrings: Vec<String>,

View File

@@ -83,6 +83,7 @@ static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
static char *FormatEvents(WalProposer *wp, uint32 events);
static void UpdateDonorShmem(WalProposer *wp);
static char *MembershipConfigurationToString(MembershipConfiguration *mconf);
static void MembershipConfigurationCopy(MembershipConfiguration *src, MembershipConfiguration *dst);
static void MembershipConfigurationFree(MembershipConfiguration *mconf);
WalProposer *
@@ -97,7 +98,32 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
wp->config = config;
wp->api = api;
for (host = wp->config->safekeepers_list; host != NULL && *host != '\0'; host = sep)
wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list);
/*
* If safekeepers list starts with g# parse generation number followed by
* :
*/
if (strncmp(wp->config->safekeepers_list, "g#", 2) == 0)
{
char *endptr;
errno = 0;
wp->safekeepers_generation = strtoul(wp->config->safekeepers_list + 2, &endptr, 10);
if (errno != 0)
{
wp_log(FATAL, "failed to parse neon.safekeepers generation number: %m");
}
/* Skip past : to the first hostname. */
host = endptr + 1;
}
else
{
host = wp->config->safekeepers_list;
}
wp_log(LOG, "safekeepers_generation=%u", wp->safekeepers_generation);
for (; host != NULL && *host != '\0'; host = sep)
{
port = strchr(host, ':');
if (port == NULL)
@@ -183,6 +209,12 @@ WalProposerFree(WalProposer *wp)
pfree(wp);
}
static bool
WalProposerGenerationsEnabled(WalProposer *wp)
{
return wp->safekeepers_generation != 0;
}
/*
* Create new AppendRequest message and start sending it. This function is
* called from walsender every time the new WAL is available.
@@ -600,10 +632,14 @@ static void
SendStartWALPush(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
/* Forbid implicit timeline creation if generations are enabled. */
char *allow_timeline_creation = WalProposerGenerationsEnabled(wp) ? "false" : "true";
#define CMD_LEN 512
char cmd[CMD_LEN];
snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d')", wp->config->proto_version);
snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d', allow_timeline_creation '%s')", wp->config->proto_version, allow_timeline_creation);
if (!wp->api.conn_send_query(sk, cmd))
{
wp_log(WARNING, "failed to send '%s' query to safekeeper %s:%s: %s",
@@ -705,6 +741,18 @@ RecvAcceptorGreeting(Safekeeper *sk)
sk->host, sk->port, sk->greetResponse.nodeId, mconf_toml, sk->greetResponse.term);
pfree(mconf_toml);
/*
* Adopt mconf of safekeepers if it is higher. TODO: mconf change should
* restart wp if it started voting.
*/
if (sk->greetResponse.mconf.generation > wp->mconf.generation)
{
MembershipConfigurationFree(&wp->mconf);
MembershipConfigurationCopy(&sk->greetResponse.mconf, &wp->mconf);
/* full conf was just logged above */
wp_log(LOG, "changed mconf to generation %u", wp->mconf.generation);
}
/* Protocol is all good, move to voting. */
sk->state = SS_VOTING;
@@ -1896,7 +1944,8 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf
pq_sendint64_le(buf, m->termHistory->entries[i].term);
pq_sendint64_le(buf, m->termHistory->entries[i].lsn);
}
/*
/*
* Removed timeline_start_lsn. Still send it as a valid
* value until safekeepers taking it from term history are
* deployed.
@@ -2162,7 +2211,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
}
}
wp_log(FATAL, "unsupported proto_version %d", wp->config->proto_version);
return false; /* keep the compiler quiet */
return false; /* keep the compiler quiet */
}
/*
@@ -2570,6 +2619,18 @@ MembershipConfigurationToString(MembershipConfiguration *mconf)
return s.data;
}
static void
MembershipConfigurationCopy(MembershipConfiguration *src, MembershipConfiguration *dst)
{
dst->generation = src->generation;
dst->members.len = src->members.len;
dst->members.m = palloc0(sizeof(SafekeeperId) * dst->members.len);
memcpy(dst->members.m, src->members.m, sizeof(SafekeeperId) * dst->members.len);
dst->new_members.len = src->new_members.len;
dst->new_members.m = palloc0(sizeof(SafekeeperId) * dst->new_members.len);
memcpy(dst->new_members.m, src->new_members.m, sizeof(SafekeeperId) * dst->new_members.len);
}
static void
MembershipConfigurationFree(MembershipConfiguration *mconf)
{

View File

@@ -160,7 +160,10 @@ typedef struct MemberSet
SafekeeperId *m; /* ids themselves */
} MemberSet;
/* Timeline safekeeper membership configuration. */
/*
* Timeline safekeeper membership configuration as sent in the
* protocol.
*/
typedef struct MembershipConfiguration
{
Generation generation;
@@ -761,8 +764,22 @@ typedef struct WalProposer
/* (n_safekeepers / 2) + 1 */
int quorum;
/*
* Generation of the membership conf of which safekeepers[] are presumably
* members. To make cplane life a bit easier and have more control in
* tests with which sks walproposer gets connected neon.safekeepers GUC
* doesn't provide full mconf, only the list of endpoints to connect to.
* We still would like to know generation associated with it because 1) we
* need some handle to enforce using generations in walproposer, and
* non-zero value of this serves the purpose; 2) currently we don't do
* that, but in theory walproposer can update list of safekeepers to
* connect to upon receiving mconf from safekeepers, and generation number
* must be checked to see which list is newer.
*/
Generation safekeepers_generation;
/* Number of occupied slots in safekeepers[] */
int n_safekeepers;
/* Safekeepers walproposer is connecting to. */
Safekeeper safekeeper[MAX_SAFEKEEPERS];
/* WAL has been generated up to this point */

View File

@@ -525,12 +525,14 @@ class NeonLocalCli(AbstractNeonCli):
def endpoint_start(
self,
endpoint_id: str,
safekeepers_generation: int | None = None,
safekeepers: list[int] | None = None,
remote_ext_config: str | None = None,
pageserver_id: int | None = None,
allow_multiple: bool = False,
create_test_user: bool = False,
basebackup_request_tries: int | None = None,
timeout: str | None = None,
env: dict[str, str] | None = None,
) -> subprocess.CompletedProcess[str]:
args = [
@@ -543,6 +545,8 @@ class NeonLocalCli(AbstractNeonCli):
if remote_ext_config is not None:
args.extend(["--remote-ext-config", remote_ext_config])
if safekeepers_generation is not None:
args.extend(["--safekeepers-generation", str(safekeepers_generation)])
if safekeepers is not None:
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
if endpoint_id is not None:
@@ -553,6 +557,8 @@ class NeonLocalCli(AbstractNeonCli):
args.extend(["--allow-multiple"])
if create_test_user:
args.extend(["--create-test-user"])
if timeout is not None:
args.extend(["--start-timeout", str(timeout)])
res = self.raw_cli(args, extra_env_vars)
res.check_returncode()

View File

@@ -4005,10 +4005,12 @@ class Endpoint(PgProtocol, LogUtils):
self,
remote_ext_config: str | None = None,
pageserver_id: int | None = None,
safekeeper_generation: int | None = None,
safekeepers: list[int] | None = None,
allow_multiple: bool = False,
create_test_user: bool = False,
basebackup_request_tries: int | None = None,
timeout: str | None = None,
env: dict[str, str] | None = None,
) -> Self:
"""
@@ -4018,19 +4020,21 @@ class Endpoint(PgProtocol, LogUtils):
assert self.endpoint_id is not None
# If `safekeepers` is not None, they are remember them as active and use
# in the following commands.
# If `safekeepers` is not None, remember them as active and use in the
# following commands.
if safekeepers is not None:
self.active_safekeepers = safekeepers
self.env.neon_cli.endpoint_start(
self.endpoint_id,
safekeepers_generation=safekeeper_generation,
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
create_test_user=create_test_user,
basebackup_request_tries=basebackup_request_tries,
timeout=timeout,
env=env,
)
self._running.release(1)

View File

@@ -2281,6 +2281,54 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
http_cli.timeline_status(tenant_id, timeline_id)
def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
"""
Test that having neon.safekeepers starting with g#n: with non zero n enables
generations, which as a side effect disables automatic timeline creation.
This is kind of bootstrapping test: here membership conf & timeline is
created manually, later storcon will do that.
"""
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
ps = env.pageservers[0]
ps_http_cli = ps.http_client()
http_clis = [sk.http_client() for sk in env.safekeepers]
config_lines = [
"neon.safekeeper_proto_version = 3",
]
ep = env.endpoints.create("main", config_lines=config_lines)
# expected to fail because timeline is not created on safekeepers
with pytest.raises(Exception, match=r".*timed out.*"):
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3], timeout="2s")
# figure out initial LSN.
ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id)
init_lsn = ps_timeline_detail["last_record_lsn"]
log.info(f"initial LSN: {init_lsn}")
# sk timeline creation request expects minor version
pg_version = ps_timeline_detail["pg_version"] * 10000
# create inital mconf
sk_ids = [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in env.safekeepers]
mconf = Configuration(generation=1, members=sk_ids, new_members=None)
create_r = TimelineCreateRequest(
tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None
)
log.info(f"sending timeline create: {create_r.to_json()}")
for sk_http_cli in http_clis:
sk_http_cli.timeline_create(create_r)
# Once timeline created endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries
# when compute is active, but there are no writes to the timeline. In that case
# pageserver should maintain a single connection to safekeeper and don't attempt