From 4077eeecaae9f13c3d3583eb436a836956320662 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 13 Mar 2025 16:42:04 -0400 Subject: [PATCH] resolve comments Signed-off-by: Alex Chi Z --- compute_tools/src/compute.rs | 12 +------ compute_tools/src/config.rs | 7 ++-- libs/compute_api/src/spec.rs | 12 +++++++ pageserver/src/page_service.rs | 19 ++--------- pgxn/neon/libpagestore.c | 60 +++++++++++++++++++++++++--------- 5 files changed, 62 insertions(+), 48 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index d7351bb98f..bf5f0a09c9 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -856,17 +856,7 @@ impl ComputeNode { } if let Some(spec) = &compute_state.pspec { - match spec.spec.mode { - ComputeMode::Primary => { - config.application_name("compute_ctl-primary"); - } - ComputeMode::Replica => { - config.application_name("compute_ctl-replica"); - } - ComputeMode::Static(_) => { - config.application_name("compute_ctl-static"); - } - } + config.application_name(&format!("compute_ctl-{}", spec.spec.mode.to_type_str())); } else { config.application_name("compute_ctl"); } diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index ef656fcc90..1de221c435 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -99,18 +99,15 @@ pub fn write_postgres_conf( writeln!(file, "lc_numeric='C.UTF-8'")?; } + writeln!(file, "neon.endpoint_type={}", spec.mode.to_type_str())?; match spec.mode { - ComputeMode::Primary => { - writeln!(file, "neon.compute_type=primary")?; - } + ComputeMode::Primary => {} ComputeMode::Static(lsn) => { - writeln!(file, "neon.compute_type=static")?; // hot_standby is 'on' by default, but let's be explicit writeln!(file, "hot_standby=on")?; writeln!(file, "recovery_target_lsn='{lsn}'")?; } ComputeMode::Replica => { - writeln!(file, "neon.compute_type=replica")?; // hot_standby is 'on' by default, but let's be explicit writeln!(file, "hot_standby=on")?; } diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 77f2e1e631..2b166d0f11 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -272,6 +272,18 @@ pub enum ComputeMode { Replica, } +impl ComputeMode { + /// Convert the compute mode to a string that can be used to identify the type of compute, + /// which means that if it's a static compute, the LSN will not be included. + pub fn to_type_str(&self) -> &'static str { + match self { + ComputeMode::Primary => "primary", + ComputeMode::Static(_) => "static", + ComputeMode::Replica => "replica", + } + } +} + /// Log level for audit logging /// Disabled, log, hipaa /// Default is Disabled diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 4870348e47..f2d2ab05ad 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -356,8 +356,6 @@ struct PageServerHandler { pipelining_config: PageServicePipeliningConfig, gate_guard: GateGuard, - - application_info: Option<(String, Option)>, } struct TimelineHandles { @@ -712,7 +710,6 @@ impl PageServerHandler { cancel, pipelining_config, gate_guard, - application_info: None, } } @@ -2550,19 +2547,14 @@ where if let FeStartupPacket::StartupMessage { params, .. } = sm { if let Some(app_name) = params.get("application_name") { - if let Some((app_name, compute_type)) = app_name.split_once('-') { - self.application_info = - Some((app_name.to_string(), Some(compute_type.to_string()))); - } else { - self.application_info = Some((app_name.to_string(), None)); - } + Span::current().record("application_name", field::display(app_name)); } }; Ok(()) } - #[instrument(skip_all, fields(tenant_id, timeline_id, compute_type))] + #[instrument(skip_all, fields(tenant_id, timeline_id))] async fn process_query( &mut self, pgb: &mut PostgresBackend, @@ -2575,13 +2567,6 @@ where fail::fail_point!("ps::connection-start::process-query"); - if let Some((app_name, compute_type)) = &self.application_info { - tracing::Span::current().record("application", field::display(app_name)); - if let Some(compute_type) = compute_type { - tracing::Span::current().record("compute_type", field::display(compute_type)); - } - } - let ctx = self.connection_ctx.attached_child(); debug!("process query {query_string}"); let query = PageServiceCmd::parse(query_string)?; diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index fed3474f74..95680d4e84 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -50,10 +50,25 @@ #define MIN_RECONNECT_INTERVAL_USEC 1000 #define MAX_RECONNECT_INTERVAL_USEC 1000000 + +enum NeonEndpointType { + EP_TYPE_UNKNOWN, + EP_TYPE_PRIMARY, + EP_TYPE_REPLICA, + EP_TYPE_STATIC +}; + +static const struct config_enum_entry neon_endpoint_types[] = { + {"unknown", EP_TYPE_UNKNOWN, false}, + {"primary", EP_TYPE_PRIMARY, false}, + {"replica", EP_TYPE_REPLICA, false}, + {"static", EP_TYPE_STATIC, false}, + {NULL, 0, false} +}; + /* GUCs */ char *neon_timeline; char *neon_tenant; -char *neon_compute_type; int32 max_cluster_size; char *page_server_connstring; char *neon_auth_token; @@ -63,6 +78,8 @@ int flush_every_n_requests = 8; int neon_protocol_version = 2; +static int neon_endpoint_type = 0; + static int max_reconnect_attempts = 60; static int stripe_size; @@ -391,8 +408,8 @@ pageserver_connect(shardno_t shard_no, int elevel) { case PS_Disconnected: { - const char *keywords[5]; - const char *values[5]; + const char *keywords[4]; + const char *values[4]; char pid_str[24]; int n_pgsql_params; TimestampTz now; @@ -447,10 +464,21 @@ pageserver_connect(shardno_t shard_no, int elevel) keywords[n_pgsql_params] = "application_name"; { int ret; - if (neon_compute_type) - ret = snprintf(pid_str, sizeof(pid_str), "%d-%s", MyProcPid, neon_compute_type); - else - ret = snprintf(pid_str, sizeof(pid_str), "%d", MyProcPid); + switch (neon_endpoint_type) + { + case EP_TYPE_PRIMARY: + ret = snprintf(pid_str, sizeof(pid_str), "%d-%s", MyProcPid, "primary"); + break; + case EP_TYPE_REPLICA: + ret = snprintf(pid_str, sizeof(pid_str), "%d-%s", MyProcPid, "replica"); + break; + case EP_TYPE_STATIC: + ret = snprintf(pid_str, sizeof(pid_str), "%d-%s", MyProcPid, "static"); + break; + default: + ret = snprintf(pid_str, sizeof(pid_str), "%d", MyProcPid); + break; + } if (ret < 0 || ret >= (int)(sizeof(pid_str))) elog(FATAL, "stack-allocated buffer too small to hold pid"); } @@ -1375,14 +1403,16 @@ pg_init_libpagestore(void) GUC_UNIT_MS, NULL, NULL, NULL); - DefineCustomStringVariable("neon.compute_type", - "The compute node type", - NULL, - &neon_compute_type, - "", - PGC_POSTMASTER, - 0, /* no flags required */ - NULL, NULL, NULL); + DefineCustomEnumVariable( + "neon.endpoint_type", + "The compute endpoint node type", + NULL, + &neon_endpoint_type, + EP_TYPE_UNKNOWN, + neon_endpoint_types, + PGC_POSTMASTER, + 0, + NULL, NULL, NULL); relsize_hash_init();