mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
Store basic endpoint info in endpoint.json file. (#4058)
It's more convenient than parsing the postgresql.conf file. Extracted from PR #3886. I started working on another patch (to make it safe to run two "neon_local endpoint create" commands concurrently), and realized that this change will make that simpler too.
This commit is contained in:
committed by
GitHub
parent
093fafd6bd
commit
4d55d61807
@@ -8,7 +8,7 @@
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::endpoint::Replication;
|
||||
use control_plane::endpoint::ComputeMode;
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use control_plane::pageserver::PageServerNode;
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
@@ -481,7 +481,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
timeline_id,
|
||||
None,
|
||||
pg_version,
|
||||
Replication::Primary,
|
||||
ComputeMode::Primary,
|
||||
)?;
|
||||
println!("Done");
|
||||
}
|
||||
@@ -568,8 +568,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.iter()
|
||||
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
|
||||
{
|
||||
let lsn_str = match endpoint.replication {
|
||||
Replication::Static(lsn) => {
|
||||
let lsn_str = match endpoint.mode {
|
||||
ComputeMode::Static(lsn) => {
|
||||
// -> read-only endpoint
|
||||
// Use the node's LSN.
|
||||
lsn.to_string()
|
||||
@@ -632,21 +632,14 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.copied()
|
||||
.unwrap_or(false);
|
||||
|
||||
let replication = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => Replication::Static(lsn),
|
||||
(None, true) => Replication::Replica,
|
||||
(None, false) => Replication::Primary,
|
||||
let mode = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => ComputeMode::Static(lsn),
|
||||
(None, true) => ComputeMode::Replica,
|
||||
(None, false) => ComputeMode::Primary,
|
||||
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
|
||||
};
|
||||
|
||||
cplane.new_endpoint(
|
||||
tenant_id,
|
||||
&endpoint_id,
|
||||
timeline_id,
|
||||
port,
|
||||
pg_version,
|
||||
replication,
|
||||
)?;
|
||||
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, port, pg_version, mode)?;
|
||||
}
|
||||
"start" => {
|
||||
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
|
||||
@@ -670,11 +663,11 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.unwrap_or(false);
|
||||
|
||||
if let Some(endpoint) = endpoint {
|
||||
match (&endpoint.replication, hot_standby) {
|
||||
(Replication::Static(_), true) => {
|
||||
match (&endpoint.mode, hot_standby) {
|
||||
(ComputeMode::Static(_), true) => {
|
||||
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
|
||||
}
|
||||
(Replication::Primary, true) => {
|
||||
(ComputeMode::Primary, true) => {
|
||||
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
|
||||
}
|
||||
_ => {}
|
||||
@@ -701,10 +694,10 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.copied()
|
||||
.context("Failed to `pg-version` from the argument string")?;
|
||||
|
||||
let replication = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => Replication::Static(lsn),
|
||||
(None, true) => Replication::Replica,
|
||||
(None, false) => Replication::Primary,
|
||||
let mode = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => ComputeMode::Static(lsn),
|
||||
(None, true) => ComputeMode::Replica,
|
||||
(None, false) => ComputeMode::Primary,
|
||||
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
|
||||
};
|
||||
|
||||
@@ -721,7 +714,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
timeline_id,
|
||||
port,
|
||||
pg_version,
|
||||
replication,
|
||||
mode,
|
||||
)?;
|
||||
ep.start(&auth_token)?;
|
||||
}
|
||||
|
||||
@@ -11,15 +11,31 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION};
|
||||
use crate::local_env::LocalEnv;
|
||||
use crate::pageserver::PageServerNode;
|
||||
use crate::postgresql_conf::PostgresConf;
|
||||
|
||||
// contents of a endpoint.json file
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
pub struct EndpointConf {
|
||||
name: String,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
tenant_id: TenantId,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
timeline_id: TimelineId,
|
||||
mode: ComputeMode,
|
||||
port: u16,
|
||||
pg_version: u32,
|
||||
}
|
||||
|
||||
//
|
||||
// ComputeControlPlane
|
||||
//
|
||||
@@ -70,7 +86,7 @@ impl ComputeControlPlane {
|
||||
timeline_id: TimelineId,
|
||||
port: Option<u16>,
|
||||
pg_version: u32,
|
||||
replication: Replication,
|
||||
mode: ComputeMode,
|
||||
) -> Result<Arc<Endpoint>> {
|
||||
let port = port.unwrap_or_else(|| self.get_port());
|
||||
|
||||
@@ -80,12 +96,22 @@ impl ComputeControlPlane {
|
||||
env: self.env.clone(),
|
||||
pageserver: Arc::clone(&self.pageserver),
|
||||
timeline_id,
|
||||
replication,
|
||||
mode,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
});
|
||||
|
||||
ep.create_pgdata()?;
|
||||
std::fs::write(
|
||||
ep.endpoint_path().join("endpoint.json"),
|
||||
serde_json::to_string_pretty(&EndpointConf {
|
||||
name: name.to_string(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
mode,
|
||||
port,
|
||||
pg_version,
|
||||
})?,
|
||||
)?;
|
||||
ep.setup_pg_conf()?;
|
||||
|
||||
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
|
||||
@@ -96,12 +122,13 @@ impl ComputeControlPlane {
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum Replication {
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub enum ComputeMode {
|
||||
// Regular read-write node
|
||||
Primary,
|
||||
// if recovery_target_lsn is provided, and we want to pin the node to a specific LSN
|
||||
Static(Lsn),
|
||||
Static(#[serde_as(as = "DisplayFromStr")] Lsn),
|
||||
// Hot standby; read-only replica.
|
||||
// Future versions may want to distinguish between replicas with hot standby
|
||||
// feedback and other kinds of replication configurations.
|
||||
@@ -115,7 +142,7 @@ pub struct Endpoint {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
|
||||
pub replication: Replication,
|
||||
pub mode: ComputeMode,
|
||||
|
||||
// port and address of the Postgres server
|
||||
pub address: SocketAddr,
|
||||
@@ -144,50 +171,20 @@ impl Endpoint {
|
||||
let fname = entry.file_name();
|
||||
let name = fname.to_str().unwrap().to_string();
|
||||
|
||||
// Read config file into memory
|
||||
let cfg_path = entry.path().join("pgdata").join("postgresql.conf");
|
||||
let cfg_path_str = cfg_path.to_string_lossy();
|
||||
let mut conf_file = File::open(&cfg_path)
|
||||
.with_context(|| format!("failed to open config file in {}", cfg_path_str))?;
|
||||
let conf = PostgresConf::read(&mut conf_file)
|
||||
.with_context(|| format!("failed to read config file in {}", cfg_path_str))?;
|
||||
|
||||
// Read a few options from the config file
|
||||
let context = format!("in config file {}", cfg_path_str);
|
||||
let port: u16 = conf.parse_field("port", &context)?;
|
||||
let timeline_id: TimelineId = conf.parse_field("neon.timeline_id", &context)?;
|
||||
let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?;
|
||||
|
||||
// Read postgres version from PG_VERSION file to determine which postgres version binary to use.
|
||||
// If it doesn't exist, assume broken data directory and use default pg version.
|
||||
let pg_version_path = entry.path().join("PG_VERSION");
|
||||
|
||||
let pg_version_str =
|
||||
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
|
||||
let pg_version = u32::from_str(&pg_version_str)?;
|
||||
|
||||
// parse recovery_target_lsn and primary_conninfo into Recovery Target, if any
|
||||
let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") {
|
||||
Replication::Static(Lsn::from_str(lsn_str)?)
|
||||
} else if let Some(slot_name) = conf.get("primary_slot_name") {
|
||||
let slot_name = slot_name.to_string();
|
||||
let prefix = format!("repl_{}_", timeline_id);
|
||||
assert!(slot_name.starts_with(&prefix));
|
||||
Replication::Replica
|
||||
} else {
|
||||
Replication::Primary
|
||||
};
|
||||
// Read the endpoint.json file
|
||||
let conf: EndpointConf =
|
||||
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
|
||||
|
||||
// ok now
|
||||
Ok(Endpoint {
|
||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
|
||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port),
|
||||
name,
|
||||
env: env.clone(),
|
||||
pageserver: Arc::clone(pageserver),
|
||||
timeline_id,
|
||||
replication,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
timeline_id: conf.timeline_id,
|
||||
mode: conf.mode,
|
||||
tenant_id: conf.tenant_id,
|
||||
pg_version: conf.pg_version,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -323,8 +320,8 @@ impl Endpoint {
|
||||
|
||||
conf.append_line("");
|
||||
// Replication-related configurations, such as WAL sending
|
||||
match &self.replication {
|
||||
Replication::Primary => {
|
||||
match &self.mode {
|
||||
ComputeMode::Primary => {
|
||||
// Configure backpressure
|
||||
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
|
||||
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
|
||||
@@ -366,10 +363,10 @@ impl Endpoint {
|
||||
conf.append("synchronous_standby_names", "pageserver");
|
||||
}
|
||||
}
|
||||
Replication::Static(lsn) => {
|
||||
ComputeMode::Static(lsn) => {
|
||||
conf.append("recovery_target_lsn", &lsn.to_string());
|
||||
}
|
||||
Replication::Replica => {
|
||||
ComputeMode::Replica => {
|
||||
assert!(!self.env.safekeepers.is_empty());
|
||||
|
||||
// TODO: use future host field from safekeeper spec
|
||||
@@ -409,8 +406,8 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
|
||||
let backup_lsn = match &self.replication {
|
||||
Replication::Primary => {
|
||||
let backup_lsn = match &self.mode {
|
||||
ComputeMode::Primary => {
|
||||
if !self.env.safekeepers.is_empty() {
|
||||
// LSN 0 means that it is bootstrap and we need to download just
|
||||
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
|
||||
@@ -426,8 +423,8 @@ impl Endpoint {
|
||||
None
|
||||
}
|
||||
}
|
||||
Replication::Static(lsn) => Some(*lsn),
|
||||
Replication::Replica => {
|
||||
ComputeMode::Static(lsn) => Some(*lsn),
|
||||
ComputeMode::Replica => {
|
||||
None // Take the latest snapshot available to start with
|
||||
}
|
||||
};
|
||||
@@ -526,7 +523,7 @@ impl Endpoint {
|
||||
// 3. Load basebackup
|
||||
self.load_basebackup(auth_token)?;
|
||||
|
||||
if self.replication != Replication::Primary {
|
||||
if self.mode != ComputeMode::Primary {
|
||||
File::create(self.pgdata().join("standby.signal"))?;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user