embed everything into application_name, finish storage-side code

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2025-03-11 17:28:00 -04:00
parent 4d99b0df58
commit b86c4fabb4
4 changed files with 57 additions and 34 deletions

View File

@@ -855,6 +855,22 @@ impl ComputeNode {
info!("Storage auth token not set");
}
if let Some(spec) = &compute_state.pspec {
match spec.spec.mode {
ComputeMode::Primary => {
config.application_name("compute_ctl-primary");
}
ComputeMode::Replica => {
config.application_name("compute_ctl-replica");
}
ComputeMode::Static(_) => {
config.application_name("compute_ctl-static");
}
}
} else {
config.application_name("compute_ctl");
}
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;

View File

@@ -5,7 +5,6 @@ use std::io;
use std::io::Write;
use std::io::prelude::*;
use std::path::Path;
use url::Url;
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
@@ -54,32 +53,7 @@ pub fn write_postgres_conf(
// Add options for connecting to storage
writeln!(file, "# Neon storage settings")?;
if let Some(s) = &spec.pageserver_connstring {
let connstr = if s.starts_with("postgres://") || s.starts_with("postgresql://") {
if let Ok(mut url) = Url::parse(s) {
let mode_desc = match spec.mode {
ComputeMode::Primary => "primary",
ComputeMode::Replica => "replica",
ComputeMode::Static(_) => "static",
};
url.query_pairs_mut().append_pair("compute_type", mode_desc);
url.to_string()
} else {
tracing::warn!(
"failed to add tracking info to pageserver_connstring {s}: cannot parse the URL"
);
s.clone()
}
} else {
tracing::warn!(
"failed to add tracking info to pageserver_connstring {s}: it doesn't start with postgres://"
);
s.clone()
};
writeln!(
file,
"neon.pageserver_connstring={}",
escape_conf_value(&connstr)
)?;
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
}
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "neon.stripe_size={stripe_size}")?;
@@ -126,13 +100,17 @@ pub fn write_postgres_conf(
}
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Primary => {
writeln!(file, "neon.compute_type=primary")?;
}
ComputeMode::Static(lsn) => {
writeln!(file, "neon.compute_type=static")?;
// hot_standby is 'on' by default, but let's be explicit
writeln!(file, "hot_standby=on")?;
writeln!(file, "recovery_target_lsn='{lsn}'")?;
}
ComputeMode::Replica => {
writeln!(file, "neon.compute_type=replica")?;
// hot_standby is 'on' by default, but let's be explicit
writeln!(file, "hot_standby=on")?;
}

View File

@@ -356,6 +356,8 @@ struct PageServerHandler {
pipelining_config: PageServicePipeliningConfig,
gate_guard: GateGuard,
application_info: Option<(String, Option<String>)>,
}
struct TimelineHandles {
@@ -710,6 +712,7 @@ impl PageServerHandler {
cancel,
pipelining_config,
gate_guard,
application_info: None,
}
}
@@ -2547,14 +2550,19 @@ where
if let FeStartupPacket::StartupMessage { params, .. } = sm {
if let Some(app_name) = params.get("application_name") {
Span::current().record("application_name", field::display(app_name));
if let Some((app_name, compute_type)) = app_name.split_once('-') {
self.application_info =
Some((app_name.to_string(), Some(compute_type.to_string())));
} else {
self.application_info = Some((app_name.to_string(), None));
}
}
};
Ok(())
}
#[instrument(skip_all, fields(tenant_id, timeline_id))]
#[instrument(skip_all, fields(tenant_id, timeline_id, compute_type))]
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
@@ -2567,6 +2575,13 @@ where
fail::fail_point!("ps::connection-start::process-query");
if let Some((app_name, compute_type)) = &self.application_info {
tracing::Span::current().record("application", field::display(app_name));
if let Some(compute_type) = compute_type {
tracing::Span::current().record("compute_type", field::display(compute_type));
}
}
let ctx = self.connection_ctx.attached_child();
debug!("process query {query_string}");
let query = PageServiceCmd::parse(query_string)?;

View File

@@ -53,6 +53,7 @@
/* GUCs */
char *neon_timeline;
char *neon_tenant;
char *neon_compute_type;
int32 max_cluster_size;
char *page_server_connstring;
char *neon_auth_token;
@@ -390,9 +391,9 @@ pageserver_connect(shardno_t shard_no, int elevel)
{
case PS_Disconnected:
{
const char *keywords[4];
const char *values[4];
char pid_str[16];
const char *keywords[5];
const char *values[5];
char pid_str[24];
int n_pgsql_params;
TimestampTz now;
int64 us_since_last_attempt;
@@ -445,7 +446,11 @@ pageserver_connect(shardno_t shard_no, int elevel)
*/
keywords[n_pgsql_params] = "application_name";
{
int ret = snprintf(pid_str, sizeof(pid_str), "%d", MyProcPid);
int ret;
if (neon_compute_type)
ret = snprintf(pid_str, sizeof(pid_str), "%d-%s", MyProcPid, neon_compute_type);
else
ret = snprintf(pid_str, sizeof(pid_str), "%d", MyProcPid);
if (ret < 0 || ret >= (int)(sizeof(pid_str)))
elog(FATAL, "stack-allocated buffer too small to hold pid");
}
@@ -1370,6 +1375,15 @@ pg_init_libpagestore(void)
GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomStringVariable("neon.compute_type",
"The compute node type",
NULL,
&neon_compute_type,
"",
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
relsize_hash_init();
if (page_server != NULL)