From 4d55d61807ab0b6bd2c1f698bd3386cb1d1784ab Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 2 May 2023 20:36:11 +0300 Subject: [PATCH] Store basic endpoint info in endpoint.json file. (#4058) It's more convenient than parsing the postgresql.conf file. Extracted from PR #3886. I started working on another patch (to make it safe to run two "neon_local endpoint create" commands concurrently), and realized that this change will make that simpler too. --- control_plane/src/bin/neon_local.rs | 41 +++++------ control_plane/src/endpoint.rs | 107 ++++++++++++++-------------- 2 files changed, 69 insertions(+), 79 deletions(-) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 09278e1726..0e0d71b3f1 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -8,7 +8,7 @@ use anyhow::{anyhow, bail, Context, Result}; use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; use control_plane::endpoint::ComputeControlPlane; -use control_plane::endpoint::Replication; +use control_plane::endpoint::ComputeMode; use control_plane::local_env::LocalEnv; use control_plane::pageserver::PageServerNode; use control_plane::safekeeper::SafekeeperNode; @@ -481,7 +481,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - timeline_id, None, pg_version, - Replication::Primary, + ComputeMode::Primary, )?; println!("Done"); } @@ -568,8 +568,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .iter() .filter(|(_, endpoint)| endpoint.tenant_id == tenant_id) { - let lsn_str = match endpoint.replication { - Replication::Static(lsn) => { + let lsn_str = match endpoint.mode { + ComputeMode::Static(lsn) => { // -> read-only endpoint // Use the node's LSN. lsn.to_string() @@ -632,21 +632,14 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .copied() .unwrap_or(false); - let replication = match (lsn, hot_standby) { - (Some(lsn), false) => Replication::Static(lsn), - (None, true) => Replication::Replica, - (None, false) => Replication::Primary, + let mode = match (lsn, hot_standby) { + (Some(lsn), false) => ComputeMode::Static(lsn), + (None, true) => ComputeMode::Replica, + (None, false) => ComputeMode::Primary, (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"), }; - cplane.new_endpoint( - tenant_id, - &endpoint_id, - timeline_id, - port, - pg_version, - replication, - )?; + cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, port, pg_version, mode)?; } "start" => { let port: Option = sub_args.get_one::("port").copied(); @@ -670,11 +663,11 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .unwrap_or(false); if let Some(endpoint) = endpoint { - match (&endpoint.replication, hot_standby) { - (Replication::Static(_), true) => { + match (&endpoint.mode, hot_standby) { + (ComputeMode::Static(_), true) => { bail!("Cannot start a node in hot standby mode when it is already configured as a static replica") } - (Replication::Primary, true) => { + (ComputeMode::Primary, true) => { bail!("Cannot start a node as a hot standby replica, it is already configured as primary node") } _ => {} @@ -701,10 +694,10 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .copied() .context("Failed to `pg-version` from the argument string")?; - let replication = match (lsn, hot_standby) { - (Some(lsn), false) => Replication::Static(lsn), - (None, true) => Replication::Replica, - (None, false) => Replication::Primary, + let mode = match (lsn, hot_standby) { + (Some(lsn), false) => ComputeMode::Static(lsn), + (None, true) => ComputeMode::Replica, + (None, false) => ComputeMode::Primary, (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"), }; @@ -721,7 +714,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( timeline_id, port, pg_version, - replication, + mode, )?; ep.start(&auth_token)?; } diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 7d3485518f..5a1f93dc99 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -11,15 +11,31 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; use utils::{ id::{TenantId, TimelineId}, lsn::Lsn, }; -use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION}; +use crate::local_env::LocalEnv; use crate::pageserver::PageServerNode; use crate::postgresql_conf::PostgresConf; +// contents of a endpoint.json file +#[serde_as] +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] +pub struct EndpointConf { + name: String, + #[serde_as(as = "DisplayFromStr")] + tenant_id: TenantId, + #[serde_as(as = "DisplayFromStr")] + timeline_id: TimelineId, + mode: ComputeMode, + port: u16, + pg_version: u32, +} + // // ComputeControlPlane // @@ -70,7 +86,7 @@ impl ComputeControlPlane { timeline_id: TimelineId, port: Option, pg_version: u32, - replication: Replication, + mode: ComputeMode, ) -> Result> { let port = port.unwrap_or_else(|| self.get_port()); @@ -80,12 +96,22 @@ impl ComputeControlPlane { env: self.env.clone(), pageserver: Arc::clone(&self.pageserver), timeline_id, - replication, + mode, tenant_id, pg_version, }); - ep.create_pgdata()?; + std::fs::write( + ep.endpoint_path().join("endpoint.json"), + serde_json::to_string_pretty(&EndpointConf { + name: name.to_string(), + tenant_id, + timeline_id, + mode, + port, + pg_version, + })?, + )?; ep.setup_pg_conf()?; self.endpoints.insert(ep.name.clone(), Arc::clone(&ep)); @@ -96,12 +122,13 @@ impl ComputeControlPlane { /////////////////////////////////////////////////////////////////////////////// -#[derive(Debug, Clone, Eq, PartialEq)] -pub enum Replication { +#[serde_as] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)] +pub enum ComputeMode { // Regular read-write node Primary, // if recovery_target_lsn is provided, and we want to pin the node to a specific LSN - Static(Lsn), + Static(#[serde_as(as = "DisplayFromStr")] Lsn), // Hot standby; read-only replica. // Future versions may want to distinguish between replicas with hot standby // feedback and other kinds of replication configurations. @@ -115,7 +142,7 @@ pub struct Endpoint { pub tenant_id: TenantId, pub timeline_id: TimelineId, // Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary. - pub replication: Replication, + pub mode: ComputeMode, // port and address of the Postgres server pub address: SocketAddr, @@ -144,50 +171,20 @@ impl Endpoint { let fname = entry.file_name(); let name = fname.to_str().unwrap().to_string(); - // Read config file into memory - let cfg_path = entry.path().join("pgdata").join("postgresql.conf"); - let cfg_path_str = cfg_path.to_string_lossy(); - let mut conf_file = File::open(&cfg_path) - .with_context(|| format!("failed to open config file in {}", cfg_path_str))?; - let conf = PostgresConf::read(&mut conf_file) - .with_context(|| format!("failed to read config file in {}", cfg_path_str))?; - - // Read a few options from the config file - let context = format!("in config file {}", cfg_path_str); - let port: u16 = conf.parse_field("port", &context)?; - let timeline_id: TimelineId = conf.parse_field("neon.timeline_id", &context)?; - let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?; - - // Read postgres version from PG_VERSION file to determine which postgres version binary to use. - // If it doesn't exist, assume broken data directory and use default pg version. - let pg_version_path = entry.path().join("PG_VERSION"); - - let pg_version_str = - fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string()); - let pg_version = u32::from_str(&pg_version_str)?; - - // parse recovery_target_lsn and primary_conninfo into Recovery Target, if any - let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") { - Replication::Static(Lsn::from_str(lsn_str)?) - } else if let Some(slot_name) = conf.get("primary_slot_name") { - let slot_name = slot_name.to_string(); - let prefix = format!("repl_{}_", timeline_id); - assert!(slot_name.starts_with(&prefix)); - Replication::Replica - } else { - Replication::Primary - }; + // Read the endpoint.json file + let conf: EndpointConf = + serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?; // ok now Ok(Endpoint { - address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), + address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port), name, env: env.clone(), pageserver: Arc::clone(pageserver), - timeline_id, - replication, - tenant_id, - pg_version, + timeline_id: conf.timeline_id, + mode: conf.mode, + tenant_id: conf.tenant_id, + pg_version: conf.pg_version, }) } @@ -323,8 +320,8 @@ impl Endpoint { conf.append_line(""); // Replication-related configurations, such as WAL sending - match &self.replication { - Replication::Primary => { + match &self.mode { + ComputeMode::Primary => { // Configure backpressure // - Replication write lag depends on how fast the walreceiver can process incoming WAL. // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec, @@ -366,10 +363,10 @@ impl Endpoint { conf.append("synchronous_standby_names", "pageserver"); } } - Replication::Static(lsn) => { + ComputeMode::Static(lsn) => { conf.append("recovery_target_lsn", &lsn.to_string()); } - Replication::Replica => { + ComputeMode::Replica => { assert!(!self.env.safekeepers.is_empty()); // TODO: use future host field from safekeeper spec @@ -409,8 +406,8 @@ impl Endpoint { } fn load_basebackup(&self, auth_token: &Option) -> Result<()> { - let backup_lsn = match &self.replication { - Replication::Primary => { + let backup_lsn = match &self.mode { + ComputeMode::Primary => { if !self.env.safekeepers.is_empty() { // LSN 0 means that it is bootstrap and we need to download just // latest data from the pageserver. That is a bit clumsy but whole bootstrap @@ -426,8 +423,8 @@ impl Endpoint { None } } - Replication::Static(lsn) => Some(*lsn), - Replication::Replica => { + ComputeMode::Static(lsn) => Some(*lsn), + ComputeMode::Replica => { None // Take the latest snapshot available to start with } }; @@ -526,7 +523,7 @@ impl Endpoint { // 3. Load basebackup self.load_basebackup(auth_token)?; - if self.replication != Replication::Primary { + if self.mode != ComputeMode::Primary { File::create(self.pgdata().join("standby.signal"))?; }