Add neon.safekeeper_connstrings GUC

This GUC is very similar to neon.safekeepers, but instead of being
formatted as host:port, it is a Postgres connection string. The purpose
for changing how we connect to safekeepers is so that we can pass
various libpq SSL keyword parameters in the connection string.

A future PR will remove the `neon.safekeepers` GUC.

Signed-off-by: Tristan Partin <tristan@neon.tech>
This commit is contained in:
Tristan Partin
2025-05-05 11:00:14 -05:00
parent 0e0ad073bf
commit a3c5981106
22 changed files with 237 additions and 112 deletions

View File

@@ -239,6 +239,7 @@ walproposer-lib: neon-pg-ext-v17
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile walproposer-lib
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgport.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgcommon.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpq.so $(POSTGRES_INSTALL_DIR)/build/walproposer-lib
$(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgport.a \
pg_strong_random.o
$(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \

View File

@@ -218,7 +218,9 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
if matches!(spec.mode, ComputeMode::Primary) {
spec.cluster
.settings
.find("neon.safekeepers")
.find("neon.safekeeper_connstrings")
// TODO(tristan957): Remove the compatibility code here.
.or(spec.cluster.settings.find("neon.safekeepers"))
.ok_or("safekeeper connstrings should be provided")?
.split(',')
.map(|str| str.to_string())

View File

@@ -75,7 +75,7 @@ pub fn write_postgres_conf(
neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(","));
writeln!(
file,
"neon.safekeepers={}",
"neon.safekeeper_connstrings={}",
escape_conf_value(&neon_safekeepers_value)
)?;
}

View File

@@ -30,7 +30,7 @@ mod pg_helpers_tests {
r#"fsync = off
wal_level = logical
hot_standby = on
neon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501'
neon.safekeeper_connstrings = 'host=127.0.0.1 port=6502,host=127.0.0.1 port=6503,host=127.0.0.1 port=6501'
wal_log_hints = on
log_connections = on
shared_buffers = 32768

View File

@@ -632,7 +632,7 @@ struct EndpointStartCmdArgs {
#[clap(
long,
help = "Safekeepers membership generation to prefix neon.safekeepers with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations."
help = "Safekeepers membership generation to prefix neon.safekeeper_connstrings with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations."
)]
safekeepers_generation: Option<u32>,
#[clap(

View File

@@ -454,10 +454,10 @@ impl Endpoint {
.env
.safekeepers
.iter()
.map(|sk| format!("localhost:{}", sk.get_compute_port()))
.map(|sk| format!("host=localhost port={}", sk.get_compute_port()))
.collect::<Vec<String>>()
.join(",");
conf.append("neon.safekeepers", &safekeepers);
conf.append("neon.safekeeper_connstrings", &safekeepers);
} else {
// We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver,
@@ -623,7 +623,8 @@ impl Endpoint {
.iter()
.find(|node| node.id == sk_id)
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
safekeeper_connstrings
.push(format!("host=127.0.0.1 port={}", sk.get_compute_port()));
}
}
Ok(safekeeper_connstrings)

View File

@@ -104,6 +104,11 @@
"value": "safekeeper1:5454,safekeeper2:5454,safekeeper3:5454",
"vartype": "string"
},
{
"name": "neon.safekeeper_connstrings",
"value": "host=safekeeper1 port=5454,host=safekeeper2 port=5454,host=safekeeper3 port=5454",
"vartype": "string"
},
{
"name": "neon.timeline_id",
"value": "TIMELINE_ID",

View File

@@ -24,7 +24,7 @@ because configs may be parsed and dumped into logs.
#### Tokens generation and validation
JWT tokens are signed using a private key.
Compute/pageserver/safekeeper use the private key's public counterpart to validate JWT tokens.
These components should not have access to the private key and may only get tokens from their configuration or external clients.
These components should not have access to the private key and may only get tokens from their configuration or external clients.
The key pair is generated once for an installation of compute/pageserver/safekeeper, e.g. by `neon_local init`.
There is currently no way to rotate the key without bringing down all components.
@@ -117,8 +117,8 @@ pageserver uses JWT tokens for authentication, so the password is really a
token.)
Compute connects to Safekeepers to write and commit data. The list of safekeeper
addresses is given in the `neon.safekeepers` GUC. The connections to the
safekeepers take the password from the `$NEON_AUTH_TOKEN` environment
addresses is given in the `neon.safekeeper_connstrings` GUC. The connections to
the safekeepers take the password from the `$NEON_AUTH_TOKEN` environment
variable, if set.
The `compute_ctl` binary that runs before the PostgreSQL server, and launches

View File

@@ -269,11 +269,13 @@ calls should be retried until they succeed.
When compute receives safekeepers list from control plane it needs to know the
generation to checked whether it should be updated (note that compute may get
safekeeper list from either cplane or safekeepers). Currently `neon.safekeepers`
GUC is just a comma separates list of `host:port`. Let's prefix it with
`g#<generation>:` to this end, so it will look like
safekeeper list from either cplane or safekeepers). Currently
`neon.safekeeper_connstrings` GUC is just a comma separates list of Postgres
connection strings. Let's prefix it with `g#<generation>:` to this end, so it
will look like:
```
g#42:safekeeper-0.eu-central-1.aws.neon.tech:6401,safekeeper-2.eu-central-1.aws.neon.tech:6401,safekeeper-1.eu-central-1.aws.neon.tech:6401
g#42:host=safekeeper-0.eu-central-1.aws.neon.tech port=6401,host=safekeeper-2.eu-central-1.aws.neon.tech port=6401,host=safekeeper-1.eu-central-1.aws.neon.tech port=6401
```
To summarize, list of cplane changes:
@@ -281,7 +283,7 @@ To summarize, list of cplane changes:
- `/notify-safekeepers` endpoint.
- Branch creation call may return list of safekeepers and when it is
present cplane should adopt it instead of choosing on its own like it does currently.
- `neon.safekeepers` GUC should be prefixed with `g#<generation>:`.
- `neon.safekeeper_connstrings` GUC should be prefixed with `g#<generation>:`.
### storage_controller implementation
@@ -455,16 +457,16 @@ So, main loop of per sk reconcile reads `safekeeper_timeline_pending_ops`
joined with timeline configuration to get current conf (with generation `n`)
for the safekeeper and does the jobs, infinitely retrying failures:
1) If node is member (`include`):
- Check if timeline exists on it, if not, call pull_timeline on it from
- Check if timeline exists on it, if not, call pull_timeline on it from
other members
- Call switch configuration to the current
2) If node is not member (`exclude`):
- Call switch configuration to the current, 404 is ok.
3) If timeline is deleted (`delete`), call delete.
In cases 1 and 2 remove `safekeeper_timeline_pending_ops` for the sk and
In cases 1 and 2 remove `safekeeper_timeline_pending_ops` for the sk and
timeline with generation <= `n` if `op_type` is not `delete`.
In case 3 also remove `safekeeper_timeline_pending_ops`
In case 3 also remove `safekeeper_timeline_pending_ops`
entry + remove `timelines` entry if there is nothing left in `safekeeper_timeline_pending_ops` for the timeline.
Let's consider in details how APIs can be implemented from this angle.
@@ -483,7 +485,7 @@ corruption. The following sequence works:
changes once ingestion starts, insert must not overwrite it (as well as other
fields like membership conf). On the contrary, start_lsn used in the next
step must be set to the value in the db. cplane_notified_generation can be set
to 1 (initial generation) in insert to avoid notifying cplane about initial
to 1 (initial generation) in insert to avoid notifying cplane about initial
conf as cplane will receive it in timeline creation request anyway.
3) Issue timeline creation calls to at least majority of safekeepers. Using
majority here is not necessary but handy because it guarantees that any live
@@ -492,15 +494,15 @@ corruption. The following sequence works:
create timeline special init case. OFC if timeline is already exists call is
ignored.
4) For minority of safekeepers which could have missed creation insert
entries to `safekeeper_timeline_pending_ops`. We won't miss this insertion
because response to cplane is sent only after it has happened, and cplane
entries to `safekeeper_timeline_pending_ops`. We won't miss this insertion
because response to cplane is sent only after it has happened, and cplane
retries the call until 200 response.
There is a small question how request handler (timeline creation in this
case) would interact with per sk reconciler. As always I prefer to do the
simplest possible thing and here it seems to be just waking it up so it
re-reads the db for work to do. Passing work in memory is faster, but
that shouldn't matter, and path to scan db for work will exist anyway,
that shouldn't matter, and path to scan db for work will exist anyway,
simpler to reuse it.
For pg version / wal segment size: while we may persist them in `timelines`
@@ -514,13 +516,13 @@ Timeline migration.
as well as deliver this conf to current ones; poke per sk reconcilers to work
on it. Also any conf change should also poke cplane notifier task(s).
2) Once it becomes possible per alg description above, get out of joint conf
with another CAS. Task should get wakeups from per sk reconcilers because
with another CAS. Task should get wakeups from per sk reconcilers because
conf switch is required for advancement; however retries should be sleep
based as well as LSN advancement might be needed, though in happy path
based as well as LSN advancement might be needed, though in happy path
it isn't. To see whether further transition is possible on wakup migration
executor polls safekeepers per the algorithm. CAS creating new conf with only
new members should again insert entries to `safekeeper_timeline_pending_ops`
to switch them there, as well as `exclude` rows to remove timeline from
to switch them there, as well as `exclude` rows to remove timeline from
old members.
Timeline deletion: just set `deleted_at` on the timeline row and insert
@@ -601,7 +603,7 @@ Let's have the following implementation bits for gradual rollout:
(and returns them in response to cplane) only when it is set to
true.
- control_plane [see above](storage_controller-<->-control-plane interface-and-changes)
prefixes `neon.safekeepers` GUC with generation number. When it is 0
prefixes `neon.safekeeper_connstrings` GUC with generation number. When it is 0
(or prefix not present at all), walproposer behaves as currently, committing on
the provided safekeeper list -- generations are disabled.
If it is non 0 it follows this RFC rules.

View File

@@ -111,7 +111,7 @@ pub struct ComputeSpec {
pub endpoint_id: Option<String>,
/// Safekeeper membership config generation. It is put in
/// neon.safekeepers GUC and serves two purposes:
/// neon.safekeeper_connstrings GUC and serves two purposes:
/// 1) Non zero value forces walproposer to use membership configurations.
/// 2) If walproposer wants to update list of safekeepers to connect to
/// taking them from some safekeeper mconf, it should check what value

View File

@@ -85,8 +85,8 @@
"vartype": "bool"
},
{
"name": "neon.safekeepers",
"value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501",
"name": "neon.safekeeper_connstrings",
"value": "host=127.0.0.1 port=6502,host=127.0.0.1 port=6503,host=127.0.0.1 port=6501",
"vartype": "string"
},
{

View File

@@ -35,6 +35,7 @@ fn main() -> anyhow::Result<()> {
println!("cargo:rustc-link-lib=static=walproposer");
println!("cargo:rustc-link-lib=static=pgport");
println!("cargo:rustc-link-lib=static=pgcommon");
println!("cargo:rustc-link-lib=pq");
println!("cargo:rustc-link-search={walproposer_lib_search_str}");
// Rebuild crate when libwalproposer.a changes

View File

@@ -171,8 +171,8 @@ pub enum WaitResult {
pub struct Config {
/// Tenant and timeline id
pub ttid: TenantTimelineId,
/// List of safekeepers in format `host:port`
pub safekeepers_list: Vec<String>,
/// List of safekeeper connection strings
pub safekeeper_connstrings: Vec<String>,
/// Safekeeper reconnect timeout in milliseconds
pub safekeeper_reconnect_timeout: i32,
/// Safekeeper connection timeout in milliseconds
@@ -185,7 +185,7 @@ pub struct Config {
/// WalProposer main struct. C methods are reexported as Rust functions.
pub struct Wrapper {
wp: *mut WalProposer,
_safekeepers_list_vec: Vec<u8>,
_safekeeper_connstrings_vec: Vec<u8>,
}
impl Wrapper {
@@ -197,18 +197,19 @@ impl Wrapper {
.unwrap()
.into_raw();
let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
let mut safekeeper_connstrings_vec = CString::new(config.safekeeper_connstrings.join(","))
.unwrap()
.into_bytes_with_nul();
assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
assert!(safekeeper_connstrings_vec.len() == safekeeper_connstrings_vec.capacity());
let safekeeper_connstrings =
safekeeper_connstrings_vec.as_mut_ptr() as *mut std::ffi::c_char;
let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
let c_config = WalProposerConfig {
neon_tenant,
neon_timeline,
safekeepers_list,
safekeeper_connstrings,
safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
safekeeper_connection_timeout: config.safekeeper_connection_timeout,
wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
@@ -224,7 +225,7 @@ impl Wrapper {
let wp = unsafe { WalProposerCreate(c_config, api) };
Wrapper {
wp,
_safekeepers_list_vec: safekeepers_list_vec,
_safekeeper_connstrings_vec: safekeeper_connstrings_vec,
}
}
@@ -575,7 +576,7 @@ mod tests {
});
let config = crate::walproposer::Config {
ttid,
safekeepers_list: vec!["localhost:5000".to_string()],
safekeeper_connstrings: vec!["host=localhost port=5000".to_string()],
safekeeper_reconnect_timeout: 1000,
safekeeper_connection_timeout: 10000,
sync_safekeepers: true,

View File

@@ -86,7 +86,7 @@ typedef struct WalProposerConn
* walprop_async_read */
} WalProposerConn;
extern WalProposerConn *libpqwp_connect_start(char *conninfo);
extern WalProposerConn *libpqwp_connect_start(const char *conninfo);
extern bool libpqwp_send_query(WalProposerConn *conn, char *query);
extern WalProposerExecStatusType libpqwp_get_query_result(WalProposerConn *conn);
extern PGAsyncReadResult libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount);

View File

@@ -35,9 +35,11 @@
*
*-------------------------------------------------------------------------
*/
#include <string.h>
#include <sys/resource.h>
#include "postgres.h"
#include "libpq-fe.h"
#include "libpq/pqformat.h"
#include "neon.h"
#include "walproposer.h"
@@ -90,9 +92,8 @@ static void MembershipConfigurationFree(MembershipConfiguration *mconf);
WalProposer *
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
{
char *host;
char *connstring;
char *sep;
char *port;
WalProposer *wp;
wp = palloc0(sizeof(WalProposer));
@@ -103,72 +104,122 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
wp->mconf.members.len = 0;
wp->mconf.new_members.len = 0;
wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list);
wp_log(LOG, "neon.safekeeper_connstrings=%s", wp->config->safekeeper_connstrings);
/*
* If safekeepers list starts with g# parse generation number followed by
* :
*/
if (strncmp(wp->config->safekeepers_list, "g#", 2) == 0)
if (strncmp(wp->config->safekeeper_connstrings, "g#", 2) == 0)
{
char *endptr;
errno = 0;
wp->safekeepers_generation = strtoul(wp->config->safekeepers_list + 2, &endptr, 10);
wp->safekeepers_generation = strtoul(wp->config->safekeeper_connstrings + 2, &endptr, 10);
if (errno != 0)
{
wp_log(FATAL, "failed to parse neon.safekeepers generation number: %m");
wp_log(FATAL, "failed to parse neon.safekeeper_connstrings generation number: %m");
}
/* Skip past : to the first hostname. */
host = endptr + 1;
connstring = endptr + 1;
}
else
{
wp->safekeepers_generation = INVALID_GENERATION;
host = wp->config->safekeepers_list;
connstring = wp->config->safekeeper_connstrings;
}
wp_log(LOG, "safekeepers_generation=%u", wp->safekeepers_generation);
for (; host != NULL && *host != '\0'; host = sep)
for (; connstring != NULL && *connstring != '\0'; connstring = sep)
{
port = strchr(host, ':');
if (port == NULL)
{
wp_log(FATAL, "port is not specified");
}
*port++ = '\0';
sep = strchr(port, ',');
char *port;
char *errmsg;
Safekeeper *sk;
PQconninfoOption *conninfo_options, *option;
sk = &wp->safekeeper[wp->n_safekeepers];
sep = strchr(connstring, ',');
if (sep != NULL)
*sep++ = '\0';
if (wp->n_safekeepers + 1 >= MAX_SAFEKEEPERS)
{
wp_log(FATAL, "too many safekeepers");
}
wp->safekeeper[wp->n_safekeepers].host = host;
wp->safekeeper[wp->n_safekeepers].port = port;
wp->safekeeper[wp->n_safekeepers].state = SS_OFFLINE;
wp->safekeeper[wp->n_safekeepers].active_state = SS_ACTIVE_SEND;
wp->safekeeper[wp->n_safekeepers].wp = wp;
/* TODO(tristan957): Remove this compatibility code and only keep the
* else branch.
*
* Check if we have a connection string formatted as host:port, and use
* the old parsing code.
*/
port = strchr(connstring, ':');
if (port)
{
sk->host = connstring;
*port++ = '\0';
sk->port = port;
}
else
{
conninfo_options = PQconninfoParse(connstring, &errmsg);
if (!conninfo_options)
wp_log(FATAL, "invalid safekeeper connection string: %s", errmsg);
// Save off the host and port for identification purposes
option = conninfo_options;
while (option)
{
if (!option->keyword)
break;
if (strcmp(option->keyword, "host") == 0)
{
sk->host = pstrdup(option->val);
goto end;
}
if (strcmp(option->keyword, "port") == 0)
{
sk->port = pstrdup(option->val);
goto end;
}
end:
// We've saved both the host and port, so we can skip iterating
// the rest of the list
if (sk->host && sk->port)
break;
option++;
}
PQconninfoFree(conninfo_options);
conninfo_options = option = NULL;
}
sk->state = SS_OFFLINE;
sk->active_state = SS_ACTIVE_SEND;
sk->wp = wp;
{
Safekeeper *sk = &wp->safekeeper[wp->n_safekeepers];
int written = 0;
written = snprintf((char *) &sk->conninfo, MAXCONNINFO,
written = snprintf(sk->conninfo, sizeof(sk->conninfo),
"host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'",
sk->host, sk->port, wp->config->neon_timeline, wp->config->neon_tenant);
if (written > MAXCONNINFO || written < 0)
if (written > sizeof(sk->conninfo) || written < 0)
wp_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port);
}
initStringInfo(&wp->safekeeper[wp->n_safekeepers].outbuf);
wp->safekeeper[wp->n_safekeepers].startStreamingAt = InvalidXLogRecPtr;
wp->safekeeper[wp->n_safekeepers].streamingAt = InvalidXLogRecPtr;
initStringInfo(&sk->outbuf);
sk->startStreamingAt = InvalidXLogRecPtr;
sk->streamingAt = InvalidXLogRecPtr;
wp->n_safekeepers += 1;
}
if (wp->n_safekeepers < 1)
{
wp_log(FATAL, "safekeepers addresses are not specified");
wp_log(FATAL, "safekeepers connection strings are not specified");
}
wp->quorum = wp->n_safekeepers / 2 + 1;

View File

@@ -707,12 +707,11 @@ typedef struct WalProposerConfig
char *neon_timeline;
/*
* Comma-separated list of safekeepers, in the following format:
* host1:port1,host2:port2,host3:port3
* Comma-separated list of safekeeper connection strings
*
* This cstr should be editable.
*/
char *safekeepers_list;
char *safekeeper_connstrings;
/*
* WalProposer reconnects to offline safekeepers once in this interval.
@@ -788,14 +787,15 @@ typedef struct WalProposer
/*
* Generation of the membership conf of which safekeepers[] are presumably
* members. To make cplane life a bit easier and have more control in
* tests with which sks walproposer gets connected neon.safekeepers GUC
* doesn't provide full mconf, only the list of endpoints to connect to.
* We still would like to know generation associated with it because 1) we
* need some handle to enforce using generations in walproposer, and
* non-zero value of this serves the purpose; 2) currently we don't do
* that, but in theory walproposer can update list of safekeepers to
* connect to upon receiving mconf from safekeepers, and generation number
* must be checked to see which list is newer.
* tests with which sks walproposer gets connected
* neon.safekeeper_connstrings GUC doesn't provide full mconf, only the list
* of endpoints to connect to. We still would like to know generation
* associated with it because 1) we need some handle to enforce using
* generations in walproposer, and non-zero value of this serves the
* purpose; 2) currently we don't do that, but in theory walproposer can
* update list of safekeepers to connect to upon receiving mconf from
* safekeepers, and generation number must be checked to see which list is
* newer.
*/
Generation safekeepers_generation;
/* Number of occupied slots in safekeepers[] */

View File

@@ -63,6 +63,7 @@
char *wal_acceptors_list = "";
int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
char *safekeeper_connstrings = "";
int safekeeper_proto_version = 3;
/* Set to true in the walproposer bgw. */
@@ -81,6 +82,7 @@ static HotStandbyFeedback agg_hs_feedback;
static void nwp_shmem_startup_hook(void);
static void nwp_register_gucs(void);
static void assign_neon_safekeepers(const char *newval, void *extra);
static void assign_neon_safekeeper_connstrings(const char *newval, void *extra);
static void nwp_prepare_shmem(void);
static uint64 backpressure_lag_impl(void);
static uint64 startup_backpressure_wrap(void);
@@ -117,8 +119,11 @@ init_walprop_config(bool syncSafekeepers)
{
walprop_config.neon_tenant = neon_tenant;
walprop_config.neon_timeline = neon_timeline;
/* TODO(tristan957): Remove this compatibility code after the control plane
* is updated to pass neon.safekeeper_connstrings
*/
/* WalProposerCreate scribbles directly on it, so pstrdup */
walprop_config.safekeepers_list = pstrdup(wal_acceptors_list);
walprop_config.safekeeper_connstrings = pstrdup(wal_acceptors_list[0] == '\0' ? safekeeper_connstrings : wal_acceptors_list);
walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout;
walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout;
walprop_config.wal_segment_size = wal_segment_size;
@@ -203,6 +208,15 @@ nwp_register_gucs(void)
* GUC_LIST_QUOTE */
NULL, assign_neon_safekeepers, NULL);
DefineCustomStringVariable(
"neon.safekeeper_connstrings",
"Comma-separated list of safekeeper connection strings with an optional generation prefix of the form g#X:",
NULL,
&safekeeper_connstrings,
"",
PGC_SIGHUP,
GUC_LIST_INPUT,
NULL, assign_neon_safekeeper_connstrings, NULL);
DefineCustomIntVariable(
"neon.safekeeper_reconnect_timeout",
"Walproposer reconnects to offline safekeepers once in this interval.",
@@ -236,24 +250,24 @@ nwp_register_gucs(void)
static int
split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
split_safekeeper_connstrings(char *connstrings, char *safekeepers[])
{
int n_safekeepers = 0;
char *curr_sk = safekeepers_list;
char *curr_sk = connstrings;
for (char *coma = safekeepers_list; coma != NULL && *coma != '\0'; curr_sk = coma)
for (char *comma = connstrings; comma != NULL && *comma != '\0'; curr_sk = comma)
{
if (++n_safekeepers >= MAX_SAFEKEEPERS)
{
wpg_log(FATAL, "too many safekeepers");
}
coma = strchr(coma, ',');
comma = strchr(comma, ',');
safekeepers[n_safekeepers - 1] = curr_sk;
if (coma != NULL)
if (comma != NULL)
{
*coma++ = '\0';
*comma++ = '\0';
}
}
@@ -261,19 +275,19 @@ split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
}
/*
* Accept two coma-separated strings with list of safekeeper host:port addresses.
* Accept two comma-separated strings with list of safekeeper host:port addresses.
* Split them into arrays and return false if two sets do not match, ignoring the order.
*/
static bool
safekeepers_cmp(char *old, char *new)
safekeeper_connstrings_cmp(char *old, char *new)
{
char *safekeepers_old[MAX_SAFEKEEPERS];
char *safekeepers_new[MAX_SAFEKEEPERS];
int len_old = 0;
int len_new = 0;
len_old = split_safekeepers_list(old, safekeepers_old);
len_new = split_safekeepers_list(new, safekeepers_new);
len_old = split_safekeeper_connstrings(old, safekeepers_old);
len_new = split_safekeeper_connstrings(new, safekeepers_new);
if (len_old != len_new)
{
@@ -294,6 +308,44 @@ safekeepers_cmp(char *old, char *new)
return true;
}
/*
* GUC assign_hook for neon.safekeeper_connstrings. Restarts walproposer through
* FATAL if the list changed.
*/
static void
assign_neon_safekeeper_connstrings(const char *newval, void *extra)
{
char *newval_copy;
char *oldval;
if (!am_walproposer)
return;
if (!newval)
{
/* should never happen */
wpg_log(FATAL, "neon.safekeeper_connstrings is empty");
}
/* Copy values because we will modify them in split_safekeeper_connstrings() */
newval_copy = pstrdup(newval);
oldval = pstrdup(safekeeper_connstrings);
/*
* TODO: restarting through FATAL is stupid and introduces 1s delay before
* next bgw start. We should refactor walproposer to allow graceful exit
* and thus remove this delay. XXX: If you change anything here, sync with
* test_safekeepers_reconfigure_reorder.
*/
if (!safekeeper_connstrings_cmp(oldval, newval_copy))
{
wpg_log(FATAL, "restarting walproposer to change safekeeper list from \"%s\" to \"%s\"",
safekeeper_connstrings, newval);
}
pfree(newval_copy);
pfree(oldval);
}
/*
* GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if
* the list changed.
@@ -323,7 +375,7 @@ assign_neon_safekeepers(const char *newval, void *extra)
* and thus remove this delay. XXX: If you change anything here, sync with
* test_safekeepers_reconfigure_reorder.
*/
if (!safekeepers_cmp(oldval, newval_copy))
if (!safekeeper_connstrings_cmp(oldval, newval_copy))
{
wpg_log(FATAL, "restarting walproposer to change safekeeper list from %s to %s",
wal_acceptors_list, newval);
@@ -500,8 +552,8 @@ walprop_register_bgworker(void)
{
BackgroundWorker bgw;
/* If no wal acceptors are specified, don't start the background worker. */
if (*wal_acceptors_list == '\0')
/* If no safekeepers are specified, don't start the background worker. */
if (*safekeeper_connstrings == '\0')
return;
memset(&bgw, 0, sizeof(bgw));
@@ -841,7 +893,7 @@ walprop_status(Safekeeper *sk)
}
WalProposerConn *
libpqwp_connect_start(char *conninfo)
libpqwp_connect_start(const char *conninfo)
{
PGconn *pg_conn;

View File

@@ -86,7 +86,7 @@ impl WalProposer {
let config = Config {
ttid,
safekeepers_list: addrs,
safekeeper_connstrings: addrs,
safekeeper_reconnect_timeout: 1000,
safekeeper_connection_timeout: 5000,
sync_safekeepers,

View File

@@ -207,7 +207,7 @@ impl SimulationApi {
// initialize connection state for each safekeeper
let sk_conns = args
.config
.safekeepers_list
.safekeeper_connstrings
.iter()
.map(|s| {
SafekeeperConn::new(

View File

@@ -49,9 +49,9 @@ def test_safekeepers_reconfigure_reorder(
old_sks = ""
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SHOW neon.safekeepers")
cur.execute("SHOW neon.safekeeper_connstrings")
res = cur.fetchone()
assert res is not None, "neon.safekeepers GUC is set"
assert res is not None, "neon.safekeeper_connstrings GUC is set"
old_sks = res[0]
# Reorder safekeepers
@@ -62,9 +62,9 @@ def test_safekeepers_reconfigure_reorder(
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SHOW neon.safekeepers")
cur.execute("SHOW neon.safekeeper_connstrings")
res = cur.fetchone()
assert res is not None, "neon.safekeepers GUC is set"
assert res is not None, "neon.safekeeper_connstrings GUC is set"
new_sks = res[0]
assert new_sks != old_sks, "GUC changes were applied"

View File

@@ -14,7 +14,7 @@ from contextlib import closing
from dataclasses import dataclass, field
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast
import psycopg2
import psycopg2.errors
@@ -685,7 +685,7 @@ class ProposerPostgres(PgProtocol):
f"neon.timeline_id = '{self.timeline_id}'\n",
f"neon.tenant_id = '{self.tenant_id}'\n",
"neon.pageserver_connstring = ''\n",
f"neon.safekeepers = '{safekeepers}'\n",
f"neon.safekeeper_connstrings = '{safekeepers}'\n",
f"listen_addresses = '{self.listen_addr}'\n",
f"port = '{self.port}'\n",
]
@@ -1464,7 +1464,15 @@ class SafekeeperEnv:
def get_safekeeper_connstrs(self):
assert self.safekeepers is not None, "safekeepers are not initialized"
return ",".join([sk_proc.args[2] for sk_proc in self.safekeepers])
def to_connstring(proc: subprocess.CompletedProcess[Any]) -> str:
"""
Parse <host>:<port> string into Postgres connection string
"""
(host, port, *_) = cast("str", proc.args[2]).split(":")
return f"host={host} port={port}"
return ",".join([to_connstring(sk_proc) for sk_proc in self.safekeepers])
def create_postgres(self):
assert self.tenant_id is not None, "tenant_id is not initialized"
@@ -2000,8 +2008,9 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
"""
Test that having neon.safekeepers starting with g#n: with non zero n enables
generations, which as a side effect disables automatic timeline creation.
Test that having neon.safekeeper_connstrings starting with g#n: with non
zero n enables generations, which as a side effect disables automatic
timeline creation.
This is kind of bootstrapping test: here membership conf & timeline is
created manually, later storcon will do that.
@@ -2032,10 +2041,10 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
def test_explicit_timeline_creation_storcon(neon_env_builder: NeonEnvBuilder):
"""
Test that having neon.safekeepers starting with g#n: with non zero n enables
generations, which as a side effect disables automatic timeline creation.
Like test_explicit_timeline_creation, but asks the storcon to
create membership conf & timeline.
Test that having neon.safekeeper_connstrings starting with g#n: with non
zero n enables generations, which as a side effect disables automatic
timeline creation. Like test_explicit_timeline_creation, but asks the
storcon to create membership conf & timeline.
"""
neon_env_builder.num_safekeepers = 3
neon_env_builder.storage_controller_config = {

View File

@@ -711,10 +711,10 @@ async def run_quorum_sanity(env: NeonEnv):
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [2], False)
# Test various combinations of membership configurations / neon.safekeepers
# (list of safekeepers endpoint connects to) values / up & down safekeepers and
# check that endpont can start and write data when we have quorum and can't when
# we don't.
# Test various combinations of membership configurations /
# neon.safekeeper_connstrings (list of safekeeper connection strings) values /
# up & down safekeepers and check that endpont can start and write data when we
# have quorum and can't when we don't.
def test_quorum_sanity(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 4
env = neon_env_builder.init_start()