feat(compute_ctl): pass compute type to pageserver with pg_options (#11287)

## Problem

second try of https://github.com/neondatabase/neon/pull/11185, part of
https://github.com/neondatabase/cloud/issues/24706

## Summary of changes

Tristan reminded me of the `options` field of the pg wire protocol,
which can be used to pass configurations. This patch adds the parsing on
the pageserver side, and supplies `neon.endpoint_type` as part of the
`options`.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2025-03-20 11:48:40 -04:00
committed by GitHub
parent 65d690b21d
commit 78502798ae
5 changed files with 180 additions and 4 deletions

View File

@@ -896,6 +896,14 @@ impl ComputeNode {
info!("Storage auth token not set");
}
config.application_name("compute_ctl");
if let Some(spec) = &compute_state.pspec {
config.options(&format!(
"-c neon.compute_mode={}",
spec.spec.mode.to_type_str()
));
}
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;

View File

@@ -117,6 +117,7 @@ pub fn write_postgres_conf(
writeln!(file, "lc_numeric='C.UTF-8'")?;
}
writeln!(file, "neon.compute_mode={}", spec.mode.to_type_str())?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Static(lsn) => {

View File

@@ -269,6 +269,18 @@ pub enum ComputeMode {
Replica,
}
impl ComputeMode {
/// Convert the compute mode to a string that can be used to identify the type of compute,
/// which means that if it's a static compute, the LSN will not be included.
pub fn to_type_str(&self) -> &'static str {
match self {
ComputeMode::Primary => "primary",
ComputeMode::Static(_) => "static",
ComputeMode::Replica => "replica",
}
}
}
/// Log level for audit logging
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
pub enum ComputeAudit {

View File

@@ -237,7 +237,7 @@ pub async fn libpq_listener_main(
type ConnectionHandlerResult = anyhow::Result<()>;
#[instrument(skip_all, fields(peer_addr, application_name))]
#[instrument(skip_all, fields(peer_addr, application_name, compute_mode))]
#[allow(clippy::too_many_arguments)]
async fn page_service_conn_main(
conf: &'static PageServerConf,
@@ -2512,6 +2512,58 @@ impl PageServiceCmd {
}
}
/// Parse the startup options from the postgres wire protocol startup packet.
///
/// It takes a sequence of `-c option=X` or `-coption=X`. It parses the options string
/// by best effort and returns all the options parsed (key-value pairs) and a bool indicating
/// whether all options are successfully parsed. There could be duplicates in the options
/// if the caller passed such parameters.
fn parse_options(options: &str) -> (Vec<(String, String)>, bool) {
let mut parsing_config = false;
let mut has_error = false;
let mut config = Vec::new();
for item in options.split_whitespace() {
if item == "-c" {
if !parsing_config {
parsing_config = true;
} else {
// "-c" followed with another "-c"
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
}
} else if item.starts_with("-c") || parsing_config {
let Some((mut key, value)) = item.split_once('=') else {
// "-c" followed with an invalid option
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
};
if !parsing_config {
// Parse "-coptions=X"
let Some(stripped_key) = key.strip_prefix("-c") else {
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
};
key = stripped_key;
}
config.push((key.to_string(), value.to_string()));
parsing_config = false;
} else {
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
break;
}
}
if parsing_config {
// "-c" without the option
tracing::warn!("failed to parse the startup options: {options}");
has_error = true;
}
(config, has_error)
}
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
@@ -2556,6 +2608,14 @@ where
if let Some(app_name) = params.get("application_name") {
Span::current().record("application_name", field::display(app_name));
}
if let Some(options) = params.get("options") {
let (config, _) = parse_options(options);
for (key, value) in config {
if key == "neon.compute_mode" {
Span::current().record("compute_mode", field::display(value));
}
}
}
};
Ok(())
@@ -2669,6 +2729,7 @@ where
PageServiceCmd::Set => {
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
// TODO: allow setting options, i.e., application_name/compute_mode via SET commands
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
PageServiceCmd::LeaseLsn(LeaseLsnCmd {
@@ -2943,4 +3004,46 @@ mod tests {
let cmd = PageServiceCmd::parse(&format!("lease {tenant_id} {timeline_id} gzip 0/16ABCDE"));
assert!(cmd.is_err());
}
#[test]
fn test_parse_options() {
let (config, has_error) = parse_options(" -c neon.compute_mode=primary ");
assert!(!has_error);
assert_eq!(
config,
vec![("neon.compute_mode".to_string(), "primary".to_string())]
);
let (config, has_error) = parse_options(" -c neon.compute_mode=primary -c foo=bar ");
assert!(!has_error);
assert_eq!(
config,
vec![
("neon.compute_mode".to_string(), "primary".to_string()),
("foo".to_string(), "bar".to_string()),
]
);
let (config, has_error) = parse_options(" -c neon.compute_mode=primary -cfoo=bar");
assert!(!has_error);
assert_eq!(
config,
vec![
("neon.compute_mode".to_string(), "primary".to_string()),
("foo".to_string(), "bar".to_string()),
]
);
let (_, has_error) = parse_options("-c");
assert!(has_error);
let (_, has_error) = parse_options("-c foo=bar -c -c");
assert!(has_error);
let (_, has_error) = parse_options(" ");
assert!(!has_error);
let (_, has_error) = parse_options(" -c neon.compute_mode");
assert!(has_error);
}
}

View File

@@ -50,6 +50,20 @@
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000
enum NeonComputeMode {
CP_MODE_PRIMARY = 0,
CP_MODE_REPLICA,
CP_MODE_STATIC
};
static const struct config_enum_entry neon_compute_modes[] = {
{"primary", CP_MODE_PRIMARY, false},
{"replica", CP_MODE_REPLICA, false},
{"static", CP_MODE_STATIC, false},
{NULL, 0, false}
};
/* GUCs */
char *neon_timeline;
char *neon_tenant;
@@ -62,6 +76,7 @@ int flush_every_n_requests = 8;
int neon_protocol_version = 2;
static int neon_compute_mode = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
@@ -390,9 +405,10 @@ pageserver_connect(shardno_t shard_no, int elevel)
{
case PS_Disconnected:
{
const char *keywords[4];
const char *values[4];
char pid_str[16];
const char *keywords[5];
const char *values[5];
char pid_str[16] = { 0 };
char endpoint_str[36] = { 0 };
int n_pgsql_params;
TimestampTz now;
int64 us_since_last_attempt;
@@ -464,6 +480,31 @@ pageserver_connect(shardno_t shard_no, int elevel)
n_pgsql_params++;
}
{
bool param_set = false;
switch (neon_compute_mode)
{
case CP_MODE_PRIMARY:
strncpy(endpoint_str, "-c neon.compute_mode=primary", sizeof(endpoint_str));
param_set = true;
break;
case CP_MODE_REPLICA:
strncpy(endpoint_str, "-c neon.compute_mode=replica", sizeof(endpoint_str));
param_set = true;
break;
case CP_MODE_STATIC:
strncpy(endpoint_str, "-c neon.compute_mode=static", sizeof(endpoint_str));
param_set = true;
break;
}
if (param_set)
{
keywords[n_pgsql_params] = "options";
values[n_pgsql_params] = endpoint_str;
n_pgsql_params++;
}
}
keywords[n_pgsql_params] = NULL;
values[n_pgsql_params] = NULL;
@@ -1391,6 +1432,17 @@ pg_init_libpagestore(void)
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"neon.compute_mode",
"The compute endpoint node type",
NULL,
&neon_compute_mode,
CP_MODE_PRIMARY,
neon_compute_modes,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
relsize_hash_init();
if (page_server != NULL)