mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 07:30:37 +00:00
@@ -856,17 +856,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
if let Some(spec) = &compute_state.pspec {
|
||||
match spec.spec.mode {
|
||||
ComputeMode::Primary => {
|
||||
config.application_name("compute_ctl-primary");
|
||||
}
|
||||
ComputeMode::Replica => {
|
||||
config.application_name("compute_ctl-replica");
|
||||
}
|
||||
ComputeMode::Static(_) => {
|
||||
config.application_name("compute_ctl-static");
|
||||
}
|
||||
}
|
||||
config.application_name(&format!("compute_ctl-{}", spec.spec.mode.to_type_str()));
|
||||
} else {
|
||||
config.application_name("compute_ctl");
|
||||
}
|
||||
|
||||
@@ -99,18 +99,15 @@ pub fn write_postgres_conf(
|
||||
writeln!(file, "lc_numeric='C.UTF-8'")?;
|
||||
}
|
||||
|
||||
writeln!(file, "neon.endpoint_type={}", spec.mode.to_type_str())?;
|
||||
match spec.mode {
|
||||
ComputeMode::Primary => {
|
||||
writeln!(file, "neon.compute_type=primary")?;
|
||||
}
|
||||
ComputeMode::Primary => {}
|
||||
ComputeMode::Static(lsn) => {
|
||||
writeln!(file, "neon.compute_type=static")?;
|
||||
// hot_standby is 'on' by default, but let's be explicit
|
||||
writeln!(file, "hot_standby=on")?;
|
||||
writeln!(file, "recovery_target_lsn='{lsn}'")?;
|
||||
}
|
||||
ComputeMode::Replica => {
|
||||
writeln!(file, "neon.compute_type=replica")?;
|
||||
// hot_standby is 'on' by default, but let's be explicit
|
||||
writeln!(file, "hot_standby=on")?;
|
||||
}
|
||||
|
||||
@@ -272,6 +272,18 @@ pub enum ComputeMode {
|
||||
Replica,
|
||||
}
|
||||
|
||||
impl ComputeMode {
|
||||
/// Convert the compute mode to a string that can be used to identify the type of compute,
|
||||
/// which means that if it's a static compute, the LSN will not be included.
|
||||
pub fn to_type_str(&self) -> &'static str {
|
||||
match self {
|
||||
ComputeMode::Primary => "primary",
|
||||
ComputeMode::Static(_) => "static",
|
||||
ComputeMode::Replica => "replica",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Log level for audit logging
|
||||
/// Disabled, log, hipaa
|
||||
/// Default is Disabled
|
||||
|
||||
@@ -356,8 +356,6 @@ struct PageServerHandler {
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
|
||||
gate_guard: GateGuard,
|
||||
|
||||
application_info: Option<(String, Option<String>)>,
|
||||
}
|
||||
|
||||
struct TimelineHandles {
|
||||
@@ -712,7 +710,6 @@ impl PageServerHandler {
|
||||
cancel,
|
||||
pipelining_config,
|
||||
gate_guard,
|
||||
application_info: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2550,19 +2547,14 @@ where
|
||||
|
||||
if let FeStartupPacket::StartupMessage { params, .. } = sm {
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
if let Some((app_name, compute_type)) = app_name.split_once('-') {
|
||||
self.application_info =
|
||||
Some((app_name.to_string(), Some(compute_type.to_string())));
|
||||
} else {
|
||||
self.application_info = Some((app_name.to_string(), None));
|
||||
}
|
||||
Span::current().record("application_name", field::display(app_name));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id, timeline_id, compute_type))]
|
||||
#[instrument(skip_all, fields(tenant_id, timeline_id))]
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
@@ -2575,13 +2567,6 @@ where
|
||||
|
||||
fail::fail_point!("ps::connection-start::process-query");
|
||||
|
||||
if let Some((app_name, compute_type)) = &self.application_info {
|
||||
tracing::Span::current().record("application", field::display(app_name));
|
||||
if let Some(compute_type) = compute_type {
|
||||
tracing::Span::current().record("compute_type", field::display(compute_type));
|
||||
}
|
||||
}
|
||||
|
||||
let ctx = self.connection_ctx.attached_child();
|
||||
debug!("process query {query_string}");
|
||||
let query = PageServiceCmd::parse(query_string)?;
|
||||
|
||||
@@ -50,10 +50,25 @@
|
||||
#define MIN_RECONNECT_INTERVAL_USEC 1000
|
||||
#define MAX_RECONNECT_INTERVAL_USEC 1000000
|
||||
|
||||
|
||||
enum NeonEndpointType {
|
||||
EP_TYPE_UNKNOWN,
|
||||
EP_TYPE_PRIMARY,
|
||||
EP_TYPE_REPLICA,
|
||||
EP_TYPE_STATIC
|
||||
};
|
||||
|
||||
static const struct config_enum_entry neon_endpoint_types[] = {
|
||||
{"unknown", EP_TYPE_UNKNOWN, false},
|
||||
{"primary", EP_TYPE_PRIMARY, false},
|
||||
{"replica", EP_TYPE_REPLICA, false},
|
||||
{"static", EP_TYPE_STATIC, false},
|
||||
{NULL, 0, false}
|
||||
};
|
||||
|
||||
/* GUCs */
|
||||
char *neon_timeline;
|
||||
char *neon_tenant;
|
||||
char *neon_compute_type;
|
||||
int32 max_cluster_size;
|
||||
char *page_server_connstring;
|
||||
char *neon_auth_token;
|
||||
@@ -63,6 +78,8 @@ int flush_every_n_requests = 8;
|
||||
|
||||
int neon_protocol_version = 2;
|
||||
|
||||
static int neon_endpoint_type = 0;
|
||||
|
||||
static int max_reconnect_attempts = 60;
|
||||
static int stripe_size;
|
||||
|
||||
@@ -391,8 +408,8 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
{
|
||||
case PS_Disconnected:
|
||||
{
|
||||
const char *keywords[5];
|
||||
const char *values[5];
|
||||
const char *keywords[4];
|
||||
const char *values[4];
|
||||
char pid_str[24];
|
||||
int n_pgsql_params;
|
||||
TimestampTz now;
|
||||
@@ -447,10 +464,21 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
keywords[n_pgsql_params] = "application_name";
|
||||
{
|
||||
int ret;
|
||||
if (neon_compute_type)
|
||||
ret = snprintf(pid_str, sizeof(pid_str), "%d-%s", MyProcPid, neon_compute_type);
|
||||
else
|
||||
ret = snprintf(pid_str, sizeof(pid_str), "%d", MyProcPid);
|
||||
switch (neon_endpoint_type)
|
||||
{
|
||||
case EP_TYPE_PRIMARY:
|
||||
ret = snprintf(pid_str, sizeof(pid_str), "%d-%s", MyProcPid, "primary");
|
||||
break;
|
||||
case EP_TYPE_REPLICA:
|
||||
ret = snprintf(pid_str, sizeof(pid_str), "%d-%s", MyProcPid, "replica");
|
||||
break;
|
||||
case EP_TYPE_STATIC:
|
||||
ret = snprintf(pid_str, sizeof(pid_str), "%d-%s", MyProcPid, "static");
|
||||
break;
|
||||
default:
|
||||
ret = snprintf(pid_str, sizeof(pid_str), "%d", MyProcPid);
|
||||
break;
|
||||
}
|
||||
if (ret < 0 || ret >= (int)(sizeof(pid_str)))
|
||||
elog(FATAL, "stack-allocated buffer too small to hold pid");
|
||||
}
|
||||
@@ -1375,14 +1403,16 @@ pg_init_libpagestore(void)
|
||||
GUC_UNIT_MS,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomStringVariable("neon.compute_type",
|
||||
"The compute node type",
|
||||
NULL,
|
||||
&neon_compute_type,
|
||||
"",
|
||||
PGC_POSTMASTER,
|
||||
0, /* no flags required */
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomEnumVariable(
|
||||
"neon.endpoint_type",
|
||||
"The compute endpoint node type",
|
||||
NULL,
|
||||
&neon_endpoint_type,
|
||||
EP_TYPE_UNKNOWN,
|
||||
neon_endpoint_types,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
relsize_hash_init();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user