diff --git a/Makefile b/Makefile index 0911465fb8..088c891ba8 100644 --- a/Makefile +++ b/Makefile @@ -239,6 +239,7 @@ walproposer-lib: neon-pg-ext-v17 -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile walproposer-lib cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgport.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgcommon.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib + cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpq.so $(POSTGRES_INSTALL_DIR)/build/walproposer-lib $(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgport.a \ pg_strong_random.o $(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \ diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 8834f0d63d..1d0bcfc2c5 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -218,7 +218,9 @@ impl TryFrom for ParsedSpec { if matches!(spec.mode, ComputeMode::Primary) { spec.cluster .settings - .find("neon.safekeepers") + .find("neon.safekeeper_connstrings") + // TODO(tristan957): Remove the compatibility code here. + .or(spec.cluster.settings.find("neon.safekeepers")) .ok_or("safekeeper connstrings should be provided")? .split(',') .map(|str| str.to_string()) diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 71c6123c3b..d53782e857 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -75,7 +75,7 @@ pub fn write_postgres_conf( neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(",")); writeln!( file, - "neon.safekeepers={}", + "neon.safekeeper_connstrings={}", escape_conf_value(&neon_safekeepers_value) )?; } diff --git a/compute_tools/tests/pg_helpers_tests.rs b/compute_tools/tests/pg_helpers_tests.rs index b72c1293ee..926297289e 100644 --- a/compute_tools/tests/pg_helpers_tests.rs +++ b/compute_tools/tests/pg_helpers_tests.rs @@ -30,7 +30,7 @@ mod pg_helpers_tests { r#"fsync = off wal_level = logical hot_standby = on -neon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501' +neon.safekeeper_connstrings = 'host=127.0.0.1 port=6502,host=127.0.0.1 port=6503,host=127.0.0.1 port=6501' wal_log_hints = on log_connections = on shared_buffers = 32768 diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 6f55c0310f..159d93d555 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -632,7 +632,7 @@ struct EndpointStartCmdArgs { #[clap( long, - help = "Safekeepers membership generation to prefix neon.safekeepers with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations." + help = "Safekeepers membership generation to prefix neon.safekeeper_connstrings with. Normally neon_local sets it on its own, but this option allows to override. Non zero value forces endpoint to use membership configurations." )] safekeepers_generation: Option, #[clap( diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 4071b620d6..7aea8d726f 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -454,10 +454,10 @@ impl Endpoint { .env .safekeepers .iter() - .map(|sk| format!("localhost:{}", sk.get_compute_port())) + .map(|sk| format!("host=localhost port={}", sk.get_compute_port())) .collect::>() .join(","); - conf.append("neon.safekeepers", &safekeepers); + conf.append("neon.safekeeper_connstrings", &safekeepers); } else { // We only use setup without safekeepers for tests, // and don't care about data durability on pageserver, @@ -623,7 +623,8 @@ impl Endpoint { .iter() .find(|node| node.id == sk_id) .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?; - safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port())); + safekeeper_connstrings + .push(format!("host=127.0.0.1 port={}", sk.get_compute_port())); } } Ok(safekeeper_connstrings) diff --git a/docker-compose/compute_wrapper/var/db/postgres/configs/config.json b/docker-compose/compute_wrapper/var/db/postgres/configs/config.json index 21caf3800c..89c66f3fb3 100644 --- a/docker-compose/compute_wrapper/var/db/postgres/configs/config.json +++ b/docker-compose/compute_wrapper/var/db/postgres/configs/config.json @@ -104,6 +104,11 @@ "value": "safekeeper1:5454,safekeeper2:5454,safekeeper3:5454", "vartype": "string" }, + { + "name": "neon.safekeeper_connstrings", + "value": "host=safekeeper1 port=5454,host=safekeeper2 port=5454,host=safekeeper3 port=5454", + "vartype": "string" + }, { "name": "neon.timeline_id", "value": "TIMELINE_ID", diff --git a/docs/authentication.md b/docs/authentication.md index 522c5481b4..9faea4a595 100644 --- a/docs/authentication.md +++ b/docs/authentication.md @@ -24,7 +24,7 @@ because configs may be parsed and dumped into logs. #### Tokens generation and validation JWT tokens are signed using a private key. Compute/pageserver/safekeeper use the private key's public counterpart to validate JWT tokens. -These components should not have access to the private key and may only get tokens from their configuration or external clients. +These components should not have access to the private key and may only get tokens from their configuration or external clients. The key pair is generated once for an installation of compute/pageserver/safekeeper, e.g. by `neon_local init`. There is currently no way to rotate the key without bringing down all components. @@ -117,8 +117,8 @@ pageserver uses JWT tokens for authentication, so the password is really a token.) Compute connects to Safekeepers to write and commit data. The list of safekeeper -addresses is given in the `neon.safekeepers` GUC. The connections to the -safekeepers take the password from the `$NEON_AUTH_TOKEN` environment +addresses is given in the `neon.safekeeper_connstrings` GUC. The connections to +the safekeepers take the password from the `$NEON_AUTH_TOKEN` environment variable, if set. The `compute_ctl` binary that runs before the PostgreSQL server, and launches diff --git a/docs/rfcs/035-safekeeper-dynamic-membership-change.md b/docs/rfcs/035-safekeeper-dynamic-membership-change.md index 9b320c7285..9ddba860a5 100644 --- a/docs/rfcs/035-safekeeper-dynamic-membership-change.md +++ b/docs/rfcs/035-safekeeper-dynamic-membership-change.md @@ -269,11 +269,13 @@ calls should be retried until they succeed. When compute receives safekeepers list from control plane it needs to know the generation to checked whether it should be updated (note that compute may get -safekeeper list from either cplane or safekeepers). Currently `neon.safekeepers` -GUC is just a comma separates list of `host:port`. Let's prefix it with -`g#:` to this end, so it will look like +safekeeper list from either cplane or safekeepers). Currently +`neon.safekeeper_connstrings` GUC is just a comma separates list of Postgres +connection strings. Let's prefix it with `g#:` to this end, so it +will look like: + ``` -g#42:safekeeper-0.eu-central-1.aws.neon.tech:6401,safekeeper-2.eu-central-1.aws.neon.tech:6401,safekeeper-1.eu-central-1.aws.neon.tech:6401 +g#42:host=safekeeper-0.eu-central-1.aws.neon.tech port=6401,host=safekeeper-2.eu-central-1.aws.neon.tech port=6401,host=safekeeper-1.eu-central-1.aws.neon.tech port=6401 ``` To summarize, list of cplane changes: @@ -281,7 +283,7 @@ To summarize, list of cplane changes: - `/notify-safekeepers` endpoint. - Branch creation call may return list of safekeepers and when it is present cplane should adopt it instead of choosing on its own like it does currently. -- `neon.safekeepers` GUC should be prefixed with `g#:`. +- `neon.safekeeper_connstrings` GUC should be prefixed with `g#:`. ### storage_controller implementation @@ -455,16 +457,16 @@ So, main loop of per sk reconcile reads `safekeeper_timeline_pending_ops` joined with timeline configuration to get current conf (with generation `n`) for the safekeeper and does the jobs, infinitely retrying failures: 1) If node is member (`include`): - - Check if timeline exists on it, if not, call pull_timeline on it from + - Check if timeline exists on it, if not, call pull_timeline on it from other members - Call switch configuration to the current 2) If node is not member (`exclude`): - Call switch configuration to the current, 404 is ok. 3) If timeline is deleted (`delete`), call delete. -In cases 1 and 2 remove `safekeeper_timeline_pending_ops` for the sk and +In cases 1 and 2 remove `safekeeper_timeline_pending_ops` for the sk and timeline with generation <= `n` if `op_type` is not `delete`. -In case 3 also remove `safekeeper_timeline_pending_ops` +In case 3 also remove `safekeeper_timeline_pending_ops` entry + remove `timelines` entry if there is nothing left in `safekeeper_timeline_pending_ops` for the timeline. Let's consider in details how APIs can be implemented from this angle. @@ -483,7 +485,7 @@ corruption. The following sequence works: changes once ingestion starts, insert must not overwrite it (as well as other fields like membership conf). On the contrary, start_lsn used in the next step must be set to the value in the db. cplane_notified_generation can be set - to 1 (initial generation) in insert to avoid notifying cplane about initial + to 1 (initial generation) in insert to avoid notifying cplane about initial conf as cplane will receive it in timeline creation request anyway. 3) Issue timeline creation calls to at least majority of safekeepers. Using majority here is not necessary but handy because it guarantees that any live @@ -492,15 +494,15 @@ corruption. The following sequence works: create timeline special init case. OFC if timeline is already exists call is ignored. 4) For minority of safekeepers which could have missed creation insert - entries to `safekeeper_timeline_pending_ops`. We won't miss this insertion - because response to cplane is sent only after it has happened, and cplane + entries to `safekeeper_timeline_pending_ops`. We won't miss this insertion + because response to cplane is sent only after it has happened, and cplane retries the call until 200 response. There is a small question how request handler (timeline creation in this case) would interact with per sk reconciler. As always I prefer to do the simplest possible thing and here it seems to be just waking it up so it re-reads the db for work to do. Passing work in memory is faster, but - that shouldn't matter, and path to scan db for work will exist anyway, + that shouldn't matter, and path to scan db for work will exist anyway, simpler to reuse it. For pg version / wal segment size: while we may persist them in `timelines` @@ -514,13 +516,13 @@ Timeline migration. as well as deliver this conf to current ones; poke per sk reconcilers to work on it. Also any conf change should also poke cplane notifier task(s). 2) Once it becomes possible per alg description above, get out of joint conf - with another CAS. Task should get wakeups from per sk reconcilers because + with another CAS. Task should get wakeups from per sk reconcilers because conf switch is required for advancement; however retries should be sleep - based as well as LSN advancement might be needed, though in happy path + based as well as LSN advancement might be needed, though in happy path it isn't. To see whether further transition is possible on wakup migration executor polls safekeepers per the algorithm. CAS creating new conf with only new members should again insert entries to `safekeeper_timeline_pending_ops` - to switch them there, as well as `exclude` rows to remove timeline from + to switch them there, as well as `exclude` rows to remove timeline from old members. Timeline deletion: just set `deleted_at` on the timeline row and insert @@ -601,7 +603,7 @@ Let's have the following implementation bits for gradual rollout: (and returns them in response to cplane) only when it is set to true. - control_plane [see above](storage_controller-<->-control-plane interface-and-changes) - prefixes `neon.safekeepers` GUC with generation number. When it is 0 + prefixes `neon.safekeeper_connstrings` GUC with generation number. When it is 0 (or prefix not present at all), walproposer behaves as currently, committing on the provided safekeeper list -- generations are disabled. If it is non 0 it follows this RFC rules. diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index ad246c48ec..ed2affbe08 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -111,7 +111,7 @@ pub struct ComputeSpec { pub endpoint_id: Option, /// Safekeeper membership config generation. It is put in - /// neon.safekeepers GUC and serves two purposes: + /// neon.safekeeper_connstrings GUC and serves two purposes: /// 1) Non zero value forces walproposer to use membership configurations. /// 2) If walproposer wants to update list of safekeepers to connect to /// taking them from some safekeeper mconf, it should check what value diff --git a/libs/compute_api/tests/cluster_spec.json b/libs/compute_api/tests/cluster_spec.json index 37de24be5b..52ed41725d 100644 --- a/libs/compute_api/tests/cluster_spec.json +++ b/libs/compute_api/tests/cluster_spec.json @@ -85,8 +85,8 @@ "vartype": "bool" }, { - "name": "neon.safekeepers", - "value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501", + "name": "neon.safekeeper_connstrings", + "value": "host=127.0.0.1 port=6502,host=127.0.0.1 port=6503,host=127.0.0.1 port=6501", "vartype": "string" }, { diff --git a/libs/walproposer/build.rs b/libs/walproposer/build.rs index 530ceb1327..e1fd8e24db 100644 --- a/libs/walproposer/build.rs +++ b/libs/walproposer/build.rs @@ -35,6 +35,7 @@ fn main() -> anyhow::Result<()> { println!("cargo:rustc-link-lib=static=walproposer"); println!("cargo:rustc-link-lib=static=pgport"); println!("cargo:rustc-link-lib=static=pgcommon"); + println!("cargo:rustc-link-lib=pq"); println!("cargo:rustc-link-search={walproposer_lib_search_str}"); // Rebuild crate when libwalproposer.a changes diff --git a/libs/walproposer/src/walproposer.rs b/libs/walproposer/src/walproposer.rs index 4e50c21fca..8d1a05dad4 100644 --- a/libs/walproposer/src/walproposer.rs +++ b/libs/walproposer/src/walproposer.rs @@ -171,8 +171,8 @@ pub enum WaitResult { pub struct Config { /// Tenant and timeline id pub ttid: TenantTimelineId, - /// List of safekeepers in format `host:port` - pub safekeepers_list: Vec, + /// List of safekeeper connection strings + pub safekeeper_connstrings: Vec, /// Safekeeper reconnect timeout in milliseconds pub safekeeper_reconnect_timeout: i32, /// Safekeeper connection timeout in milliseconds @@ -185,7 +185,7 @@ pub struct Config { /// WalProposer main struct. C methods are reexported as Rust functions. pub struct Wrapper { wp: *mut WalProposer, - _safekeepers_list_vec: Vec, + _safekeeper_connstrings_vec: Vec, } impl Wrapper { @@ -197,18 +197,19 @@ impl Wrapper { .unwrap() .into_raw(); - let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(",")) + let mut safekeeper_connstrings_vec = CString::new(config.safekeeper_connstrings.join(",")) .unwrap() .into_bytes_with_nul(); - assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity()); - let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char; + assert!(safekeeper_connstrings_vec.len() == safekeeper_connstrings_vec.capacity()); + let safekeeper_connstrings = + safekeeper_connstrings_vec.as_mut_ptr() as *mut std::ffi::c_char; let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void; let c_config = WalProposerConfig { neon_tenant, neon_timeline, - safekeepers_list, + safekeeper_connstrings, safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout, safekeeper_connection_timeout: config.safekeeper_connection_timeout, wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB @@ -224,7 +225,7 @@ impl Wrapper { let wp = unsafe { WalProposerCreate(c_config, api) }; Wrapper { wp, - _safekeepers_list_vec: safekeepers_list_vec, + _safekeeper_connstrings_vec: safekeeper_connstrings_vec, } } @@ -575,7 +576,7 @@ mod tests { }); let config = crate::walproposer::Config { ttid, - safekeepers_list: vec!["localhost:5000".to_string()], + safekeeper_connstrings: vec!["host=localhost port=5000".to_string()], safekeeper_reconnect_timeout: 1000, safekeeper_connection_timeout: 10000, sync_safekeepers: true, diff --git a/pgxn/neon/libpqwalproposer.h b/pgxn/neon/libpqwalproposer.h index cd7e568a47..8bbb3f4bdc 100644 --- a/pgxn/neon/libpqwalproposer.h +++ b/pgxn/neon/libpqwalproposer.h @@ -86,7 +86,7 @@ typedef struct WalProposerConn * walprop_async_read */ } WalProposerConn; -extern WalProposerConn *libpqwp_connect_start(char *conninfo); +extern WalProposerConn *libpqwp_connect_start(const char *conninfo); extern bool libpqwp_send_query(WalProposerConn *conn, char *query); extern WalProposerExecStatusType libpqwp_get_query_result(WalProposerConn *conn); extern PGAsyncReadResult libpqwp_async_read(WalProposerConn *conn, char **buf, int *amount); diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 3befb42030..fba33c8745 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -35,9 +35,11 @@ * *------------------------------------------------------------------------- */ +#include #include #include "postgres.h" +#include "libpq-fe.h" #include "libpq/pqformat.h" #include "neon.h" #include "walproposer.h" @@ -90,9 +92,8 @@ static void MembershipConfigurationFree(MembershipConfiguration *mconf); WalProposer * WalProposerCreate(WalProposerConfig *config, walproposer_api api) { - char *host; + char *connstring; char *sep; - char *port; WalProposer *wp; wp = palloc0(sizeof(WalProposer)); @@ -103,72 +104,122 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) wp->mconf.members.len = 0; wp->mconf.new_members.len = 0; - wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list); + wp_log(LOG, "neon.safekeeper_connstrings=%s", wp->config->safekeeper_connstrings); /* * If safekeepers list starts with g# parse generation number followed by * : */ - if (strncmp(wp->config->safekeepers_list, "g#", 2) == 0) + if (strncmp(wp->config->safekeeper_connstrings, "g#", 2) == 0) { char *endptr; errno = 0; - wp->safekeepers_generation = strtoul(wp->config->safekeepers_list + 2, &endptr, 10); + wp->safekeepers_generation = strtoul(wp->config->safekeeper_connstrings + 2, &endptr, 10); if (errno != 0) { - wp_log(FATAL, "failed to parse neon.safekeepers generation number: %m"); + wp_log(FATAL, "failed to parse neon.safekeeper_connstrings generation number: %m"); } /* Skip past : to the first hostname. */ - host = endptr + 1; + connstring = endptr + 1; } else { wp->safekeepers_generation = INVALID_GENERATION; - host = wp->config->safekeepers_list; + connstring = wp->config->safekeeper_connstrings; } wp_log(LOG, "safekeepers_generation=%u", wp->safekeepers_generation); - for (; host != NULL && *host != '\0'; host = sep) + for (; connstring != NULL && *connstring != '\0'; connstring = sep) { - port = strchr(host, ':'); - if (port == NULL) - { - wp_log(FATAL, "port is not specified"); - } - *port++ = '\0'; - sep = strchr(port, ','); + char *port; + char *errmsg; + Safekeeper *sk; + PQconninfoOption *conninfo_options, *option; + + sk = &wp->safekeeper[wp->n_safekeepers]; + + sep = strchr(connstring, ','); if (sep != NULL) *sep++ = '\0'; if (wp->n_safekeepers + 1 >= MAX_SAFEKEEPERS) { wp_log(FATAL, "too many safekeepers"); } - wp->safekeeper[wp->n_safekeepers].host = host; - wp->safekeeper[wp->n_safekeepers].port = port; - wp->safekeeper[wp->n_safekeepers].state = SS_OFFLINE; - wp->safekeeper[wp->n_safekeepers].active_state = SS_ACTIVE_SEND; - wp->safekeeper[wp->n_safekeepers].wp = wp; + + /* TODO(tristan957): Remove this compatibility code and only keep the + * else branch. + * + * Check if we have a connection string formatted as host:port, and use + * the old parsing code. + */ + port = strchr(connstring, ':'); + if (port) + { + sk->host = connstring; + *port++ = '\0'; + sk->port = port; + } + else + { + conninfo_options = PQconninfoParse(connstring, &errmsg); + if (!conninfo_options) + wp_log(FATAL, "invalid safekeeper connection string: %s", errmsg); + + // Save off the host and port for identification purposes + option = conninfo_options; + while (option) + { + if (!option->keyword) + break; + + if (strcmp(option->keyword, "host") == 0) + { + sk->host = pstrdup(option->val); + goto end; + } + + if (strcmp(option->keyword, "port") == 0) + { + sk->port = pstrdup(option->val); + goto end; + } + + end: + // We've saved both the host and port, so we can skip iterating + // the rest of the list + if (sk->host && sk->port) + break; + + option++; + } + + PQconninfoFree(conninfo_options); + conninfo_options = option = NULL; + } + + sk->state = SS_OFFLINE; + sk->active_state = SS_ACTIVE_SEND; + sk->wp = wp; { - Safekeeper *sk = &wp->safekeeper[wp->n_safekeepers]; int written = 0; - written = snprintf((char *) &sk->conninfo, MAXCONNINFO, + written = snprintf(sk->conninfo, sizeof(sk->conninfo), "host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'", sk->host, sk->port, wp->config->neon_timeline, wp->config->neon_tenant); - if (written > MAXCONNINFO || written < 0) + if (written > sizeof(sk->conninfo) || written < 0) wp_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port); } - initStringInfo(&wp->safekeeper[wp->n_safekeepers].outbuf); - wp->safekeeper[wp->n_safekeepers].startStreamingAt = InvalidXLogRecPtr; - wp->safekeeper[wp->n_safekeepers].streamingAt = InvalidXLogRecPtr; + initStringInfo(&sk->outbuf); + sk->startStreamingAt = InvalidXLogRecPtr; + sk->streamingAt = InvalidXLogRecPtr; wp->n_safekeepers += 1; } if (wp->n_safekeepers < 1) { - wp_log(FATAL, "safekeepers addresses are not specified"); + wp_log(FATAL, "safekeepers connection strings are not specified"); } wp->quorum = wp->n_safekeepers / 2 + 1; diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 83ef72d3d7..f2d51a6981 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -707,12 +707,11 @@ typedef struct WalProposerConfig char *neon_timeline; /* - * Comma-separated list of safekeepers, in the following format: - * host1:port1,host2:port2,host3:port3 + * Comma-separated list of safekeeper connection strings * * This cstr should be editable. */ - char *safekeepers_list; + char *safekeeper_connstrings; /* * WalProposer reconnects to offline safekeepers once in this interval. @@ -788,14 +787,15 @@ typedef struct WalProposer /* * Generation of the membership conf of which safekeepers[] are presumably * members. To make cplane life a bit easier and have more control in - * tests with which sks walproposer gets connected neon.safekeepers GUC - * doesn't provide full mconf, only the list of endpoints to connect to. - * We still would like to know generation associated with it because 1) we - * need some handle to enforce using generations in walproposer, and - * non-zero value of this serves the purpose; 2) currently we don't do - * that, but in theory walproposer can update list of safekeepers to - * connect to upon receiving mconf from safekeepers, and generation number - * must be checked to see which list is newer. + * tests with which sks walproposer gets connected + * neon.safekeeper_connstrings GUC doesn't provide full mconf, only the list + * of endpoints to connect to. We still would like to know generation + * associated with it because 1) we need some handle to enforce using + * generations in walproposer, and non-zero value of this serves the + * purpose; 2) currently we don't do that, but in theory walproposer can + * update list of safekeepers to connect to upon receiving mconf from + * safekeepers, and generation number must be checked to see which list is + * newer. */ Generation safekeepers_generation; /* Number of occupied slots in safekeepers[] */ diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 17582405db..97cb6a16e0 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -63,6 +63,7 @@ char *wal_acceptors_list = ""; int wal_acceptor_reconnect_timeout = 1000; int wal_acceptor_connection_timeout = 10000; +char *safekeeper_connstrings = ""; int safekeeper_proto_version = 3; /* Set to true in the walproposer bgw. */ @@ -81,6 +82,7 @@ static HotStandbyFeedback agg_hs_feedback; static void nwp_shmem_startup_hook(void); static void nwp_register_gucs(void); static void assign_neon_safekeepers(const char *newval, void *extra); +static void assign_neon_safekeeper_connstrings(const char *newval, void *extra); static void nwp_prepare_shmem(void); static uint64 backpressure_lag_impl(void); static uint64 startup_backpressure_wrap(void); @@ -117,8 +119,11 @@ init_walprop_config(bool syncSafekeepers) { walprop_config.neon_tenant = neon_tenant; walprop_config.neon_timeline = neon_timeline; + /* TODO(tristan957): Remove this compatibility code after the control plane + * is updated to pass neon.safekeeper_connstrings + */ /* WalProposerCreate scribbles directly on it, so pstrdup */ - walprop_config.safekeepers_list = pstrdup(wal_acceptors_list); + walprop_config.safekeeper_connstrings = pstrdup(wal_acceptors_list[0] == '\0' ? safekeeper_connstrings : wal_acceptors_list); walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout; walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout; walprop_config.wal_segment_size = wal_segment_size; @@ -203,6 +208,15 @@ nwp_register_gucs(void) * GUC_LIST_QUOTE */ NULL, assign_neon_safekeepers, NULL); + DefineCustomStringVariable( + "neon.safekeeper_connstrings", + "Comma-separated list of safekeeper connection strings with an optional generation prefix of the form g#X:", + NULL, + &safekeeper_connstrings, + "", + PGC_SIGHUP, + GUC_LIST_INPUT, + NULL, assign_neon_safekeeper_connstrings, NULL); DefineCustomIntVariable( "neon.safekeeper_reconnect_timeout", "Walproposer reconnects to offline safekeepers once in this interval.", @@ -236,24 +250,24 @@ nwp_register_gucs(void) static int -split_safekeepers_list(char *safekeepers_list, char *safekeepers[]) +split_safekeeper_connstrings(char *connstrings, char *safekeepers[]) { int n_safekeepers = 0; - char *curr_sk = safekeepers_list; + char *curr_sk = connstrings; - for (char *coma = safekeepers_list; coma != NULL && *coma != '\0'; curr_sk = coma) + for (char *comma = connstrings; comma != NULL && *comma != '\0'; curr_sk = comma) { if (++n_safekeepers >= MAX_SAFEKEEPERS) { wpg_log(FATAL, "too many safekeepers"); } - coma = strchr(coma, ','); + comma = strchr(comma, ','); safekeepers[n_safekeepers - 1] = curr_sk; - if (coma != NULL) + if (comma != NULL) { - *coma++ = '\0'; + *comma++ = '\0'; } } @@ -261,19 +275,19 @@ split_safekeepers_list(char *safekeepers_list, char *safekeepers[]) } /* - * Accept two coma-separated strings with list of safekeeper host:port addresses. + * Accept two comma-separated strings with list of safekeeper host:port addresses. * Split them into arrays and return false if two sets do not match, ignoring the order. */ static bool -safekeepers_cmp(char *old, char *new) +safekeeper_connstrings_cmp(char *old, char *new) { char *safekeepers_old[MAX_SAFEKEEPERS]; char *safekeepers_new[MAX_SAFEKEEPERS]; int len_old = 0; int len_new = 0; - len_old = split_safekeepers_list(old, safekeepers_old); - len_new = split_safekeepers_list(new, safekeepers_new); + len_old = split_safekeeper_connstrings(old, safekeepers_old); + len_new = split_safekeeper_connstrings(new, safekeepers_new); if (len_old != len_new) { @@ -294,6 +308,44 @@ safekeepers_cmp(char *old, char *new) return true; } +/* + * GUC assign_hook for neon.safekeeper_connstrings. Restarts walproposer through + * FATAL if the list changed. + */ +static void +assign_neon_safekeeper_connstrings(const char *newval, void *extra) +{ + char *newval_copy; + char *oldval; + + if (!am_walproposer) + return; + + if (!newval) + { + /* should never happen */ + wpg_log(FATAL, "neon.safekeeper_connstrings is empty"); + } + + /* Copy values because we will modify them in split_safekeeper_connstrings() */ + newval_copy = pstrdup(newval); + oldval = pstrdup(safekeeper_connstrings); + + /* + * TODO: restarting through FATAL is stupid and introduces 1s delay before + * next bgw start. We should refactor walproposer to allow graceful exit + * and thus remove this delay. XXX: If you change anything here, sync with + * test_safekeepers_reconfigure_reorder. + */ + if (!safekeeper_connstrings_cmp(oldval, newval_copy)) + { + wpg_log(FATAL, "restarting walproposer to change safekeeper list from \"%s\" to \"%s\"", + safekeeper_connstrings, newval); + } + pfree(newval_copy); + pfree(oldval); +} + /* * GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if * the list changed. @@ -323,7 +375,7 @@ assign_neon_safekeepers(const char *newval, void *extra) * and thus remove this delay. XXX: If you change anything here, sync with * test_safekeepers_reconfigure_reorder. */ - if (!safekeepers_cmp(oldval, newval_copy)) + if (!safekeeper_connstrings_cmp(oldval, newval_copy)) { wpg_log(FATAL, "restarting walproposer to change safekeeper list from %s to %s", wal_acceptors_list, newval); @@ -500,8 +552,8 @@ walprop_register_bgworker(void) { BackgroundWorker bgw; - /* If no wal acceptors are specified, don't start the background worker. */ - if (*wal_acceptors_list == '\0') + /* If no safekeepers are specified, don't start the background worker. */ + if (*safekeeper_connstrings == '\0') return; memset(&bgw, 0, sizeof(bgw)); @@ -841,7 +893,7 @@ walprop_status(Safekeeper *sk) } WalProposerConn * -libpqwp_connect_start(char *conninfo) +libpqwp_connect_start(const char *conninfo) { PGconn *pg_conn; diff --git a/safekeeper/tests/walproposer_sim/simulation.rs b/safekeeper/tests/walproposer_sim/simulation.rs index f314143952..2db28e09dc 100644 --- a/safekeeper/tests/walproposer_sim/simulation.rs +++ b/safekeeper/tests/walproposer_sim/simulation.rs @@ -86,7 +86,7 @@ impl WalProposer { let config = Config { ttid, - safekeepers_list: addrs, + safekeeper_connstrings: addrs, safekeeper_reconnect_timeout: 1000, safekeeper_connection_timeout: 5000, sync_safekeepers, diff --git a/safekeeper/tests/walproposer_sim/walproposer_api.rs b/safekeeper/tests/walproposer_sim/walproposer_api.rs index 82e7a32881..7ec665c839 100644 --- a/safekeeper/tests/walproposer_sim/walproposer_api.rs +++ b/safekeeper/tests/walproposer_sim/walproposer_api.rs @@ -207,7 +207,7 @@ impl SimulationApi { // initialize connection state for each safekeeper let sk_conns = args .config - .safekeepers_list + .safekeeper_connstrings .iter() .map(|s| { SafekeeperConn::new( diff --git a/test_runner/regress/test_config.py b/test_runner/regress/test_config.py index 7a0e4cb3d2..ec79776973 100644 --- a/test_runner/regress/test_config.py +++ b/test_runner/regress/test_config.py @@ -49,9 +49,9 @@ def test_safekeepers_reconfigure_reorder( old_sks = "" with closing(endpoint.connect()) as conn: with conn.cursor() as cur: - cur.execute("SHOW neon.safekeepers") + cur.execute("SHOW neon.safekeeper_connstrings") res = cur.fetchone() - assert res is not None, "neon.safekeepers GUC is set" + assert res is not None, "neon.safekeeper_connstrings GUC is set" old_sks = res[0] # Reorder safekeepers @@ -62,9 +62,9 @@ def test_safekeepers_reconfigure_reorder( with closing(endpoint.connect()) as conn: with conn.cursor() as cur: - cur.execute("SHOW neon.safekeepers") + cur.execute("SHOW neon.safekeeper_connstrings") res = cur.fetchone() - assert res is not None, "neon.safekeepers GUC is set" + assert res is not None, "neon.safekeeper_connstrings GUC is set" new_sks = res[0] assert new_sks != old_sks, "GUC changes were applied" diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index a9a6699e5c..ceb4de4381 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -14,7 +14,7 @@ from contextlib import closing from dataclasses import dataclass, field from functools import partial from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast import psycopg2 import psycopg2.errors @@ -685,7 +685,7 @@ class ProposerPostgres(PgProtocol): f"neon.timeline_id = '{self.timeline_id}'\n", f"neon.tenant_id = '{self.tenant_id}'\n", "neon.pageserver_connstring = ''\n", - f"neon.safekeepers = '{safekeepers}'\n", + f"neon.safekeeper_connstrings = '{safekeepers}'\n", f"listen_addresses = '{self.listen_addr}'\n", f"port = '{self.port}'\n", ] @@ -1464,7 +1464,15 @@ class SafekeeperEnv: def get_safekeeper_connstrs(self): assert self.safekeepers is not None, "safekeepers are not initialized" - return ",".join([sk_proc.args[2] for sk_proc in self.safekeepers]) + + def to_connstring(proc: subprocess.CompletedProcess[Any]) -> str: + """ + Parse : string into Postgres connection string + """ + (host, port, *_) = cast("str", proc.args[2]).split(":") + return f"host={host} port={port}" + + return ",".join([to_connstring(sk_proc) for sk_proc in self.safekeepers]) def create_postgres(self): assert self.tenant_id is not None, "tenant_id is not initialized" @@ -2000,8 +2008,9 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder): def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder): """ - Test that having neon.safekeepers starting with g#n: with non zero n enables - generations, which as a side effect disables automatic timeline creation. + Test that having neon.safekeeper_connstrings starting with g#n: with non + zero n enables generations, which as a side effect disables automatic + timeline creation. This is kind of bootstrapping test: here membership conf & timeline is created manually, later storcon will do that. @@ -2032,10 +2041,10 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder): def test_explicit_timeline_creation_storcon(neon_env_builder: NeonEnvBuilder): """ - Test that having neon.safekeepers starting with g#n: with non zero n enables - generations, which as a side effect disables automatic timeline creation. - Like test_explicit_timeline_creation, but asks the storcon to - create membership conf & timeline. + Test that having neon.safekeeper_connstrings starting with g#n: with non + zero n enables generations, which as a side effect disables automatic + timeline creation. Like test_explicit_timeline_creation, but asks the + storcon to create membership conf & timeline. """ neon_env_builder.num_safekeepers = 3 neon_env_builder.storage_controller_config = { diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index c5dd34f64f..09730ac2e2 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -711,10 +711,10 @@ async def run_quorum_sanity(env: NeonEnv): await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [2], False) -# Test various combinations of membership configurations / neon.safekeepers -# (list of safekeepers endpoint connects to) values / up & down safekeepers and -# check that endpont can start and write data when we have quorum and can't when -# we don't. +# Test various combinations of membership configurations / +# neon.safekeeper_connstrings (list of safekeeper connection strings) values / +# up & down safekeepers and check that endpont can start and write data when we +# have quorum and can't when we don't. def test_quorum_sanity(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 4 env = neon_env_builder.init_start()