diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 4b2aa3c957..76964e8f3b 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -8,6 +8,7 @@ use anyhow::{anyhow, bail, Context, Result}; use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; use control_plane::compute::ComputeControlPlane; +use control_plane::compute::Replication; use control_plane::local_env::LocalEnv; use control_plane::pageserver::PageServerNode; use control_plane::safekeeper::SafekeeperNode; @@ -472,7 +473,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - println!("Creating node for imported timeline ..."); env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; - cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?; + cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version, None)?; println!("Done"); } Some(("branch", branch_match)) => { @@ -558,20 +559,19 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { .iter() .filter(|((node_tenant_id, _), _)| node_tenant_id == &tenant_id) { - let lsn_str = match node.lsn { - None => { - // -> primary node + let lsn_str = match node.replication { + Replication::Static(lsn) => { + // -> read-only node + // Use the node's LSN. + lsn.to_string() + } + _ => { // Use the LSN at the end of the timeline. timeline_infos .get(&node.timeline_id) .map(|bi| bi.last_record_lsn.to_string()) .unwrap_or_else(|| "?".to_string()) } - Some(lsn) => { - // -> read-only node - // Use the node's LSN. - lsn.to_string() - } }; let branch_name = timeline_name_mappings @@ -617,7 +617,17 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { .copied() .context("Failed to parse postgres version from the argument string")?; - cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?; + let replica = sub_args.get_one::("replicates").map(|s| s.as_str()); + + cplane.new_node( + tenant_id, + &node_name, + timeline_id, + lsn, + port, + pg_version, + replica, + )?; } "start" => { let port: Option = sub_args.get_one::("port").copied(); @@ -635,7 +645,20 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { None }; + let replica = sub_args.get_one::("replicates").map(|s| s.as_str()); + if let Some(node) = node { + if let Some(repl) = replica { + if let Replication::Replica(source) = &node.replication { + if source != repl { + println!("replication ignored: node already has replica configured") + } + } else { + println!( + "replication ignored: pre-existing node was not configured as replica" + ) + } + } println!("Starting existing postgres {node_name}..."); node.start(&auth_token)?; } else { @@ -664,8 +687,15 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { // start <-- will also use port X even without explicit port argument println!("Starting new postgres (v{pg_version}) {node_name} on timeline {timeline_id} ..."); - let node = - cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?; + let node = cplane.new_node( + tenant_id, + node_name, + timeline_id, + lsn, + port, + pg_version, + replica, + )?; node.start(&auth_token)?; } } @@ -918,6 +948,12 @@ fn cli() -> Command { .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.") .required(false); + let replicates_arg = Arg::new("replicates") + .short('r') + .help("If configured, the node will be a hot replica of the node identified by the name") + .long("replicates") + .required(false); + Command::new("Neon CLI") .arg_required_else_help(true) .version(GIT_VERSION) @@ -1042,6 +1078,7 @@ fn cli() -> Command { .long("config-only") .required(false)) .arg(pg_version_arg.clone()) + .arg(replicates_arg.clone()) ) .subcommand(Command::new("start") .about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files") @@ -1052,6 +1089,7 @@ fn cli() -> Command { .arg(lsn_arg) .arg(port_arg) .arg(pg_version_arg) + .arg(replicates_arg.clone()) ) .subcommand( Command::new("stop") diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 8731cf2583..b09f5dadc8 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -81,8 +81,17 @@ impl ComputeControlPlane { lsn: Option, port: Option, pg_version: u32, + standby_of: Option<&str>, ) -> Result> { let port = port.unwrap_or_else(|| self.get_port()); + + let replication = match (lsn, standby_of) { + (Some(lsn), None) => Replication::Static(lsn), + (None, Some(standby_of)) => Replication::Replica(standby_of.to_owned()), + (None, None) => Replication::Primary, + (Some(_), Some(_)) => anyhow::bail!("cannot specify both lsn and standby_of"), + }; + let node = Arc::new(PostgresNode { name: name.to_owned(), address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), @@ -90,7 +99,7 @@ impl ComputeControlPlane { pageserver: Arc::clone(&self.pageserver), is_test: false, timeline_id, - lsn, + replication, tenant_id, uses_wal_proposer: false, pg_version, @@ -108,6 +117,16 @@ impl ComputeControlPlane { /////////////////////////////////////////////////////////////////////////////// +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum Replication { + // Regular read-write node + Primary, + // if recovery_target_lsn is provided + Static(Lsn), + // Hot standby running on a timleine + Replica(String), +} + #[derive(Debug)] pub struct PostgresNode { pub address: SocketAddr, @@ -116,7 +135,7 @@ pub struct PostgresNode { pageserver: Arc, is_test: bool, pub timeline_id: TimelineId, - pub lsn: Option, // if it's a read-only node. None for primary + pub replication: Replication, pub tenant_id: TenantId, uses_wal_proposer: bool, pg_version: u32, @@ -162,9 +181,17 @@ impl PostgresNode { fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string()); let pg_version = u32::from_str(&pg_version_str)?; - // parse recovery_target_lsn, if any - let recovery_target_lsn: Option = - conf.parse_field_optional("recovery_target_lsn", &context)?; + // parse recovery_target_lsn and primary_conninfo into Recovery Target, if any + let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") { + Replication::Static(Lsn::from_str(lsn_str)?) + } else if let Some(slot_name) = conf.get("primary_slot_name") { + let slot_name = slot_name.to_string(); + let prefix = format!("repl_{}_{}_", timeline_id, name); + assert!(slot_name.starts_with(&prefix)); + Replication::Replica(slot_name[prefix.len()..].to_owned()) + } else { + Replication::Primary + }; // ok now Ok(PostgresNode { @@ -174,7 +201,7 @@ impl PostgresNode { pageserver: Arc::clone(pageserver), is_test: false, timeline_id, - lsn: recovery_target_lsn, + replication, tenant_id, uses_wal_proposer, pg_version, @@ -327,50 +354,74 @@ impl PostgresNode { } conf.append("neon.tenant_id", &self.tenant_id.to_string()); conf.append("neon.timeline_id", &self.timeline_id.to_string()); - if let Some(lsn) = self.lsn { - conf.append("recovery_target_lsn", &lsn.to_string()); - } conf.append_line(""); - // Configure backpressure - // - Replication write lag depends on how fast the walreceiver can process incoming WAL. - // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec, - // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB. - // Actually latency should be much smaller (better if < 1sec). But we assume that recently - // updates pages are not requested from pageserver. - // - Replication flush lag depends on speed of persisting data by checkpointer (creation of - // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to - // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long - // recovery time (in case of pageserver crash) and disk space overflow at safekeepers. - // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread. - // To be able to restore database in case of pageserver node crash, safekeeper should not - // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers - // (if they are not able to upload WAL to S3). - conf.append("max_replication_write_lag", "15MB"); - conf.append("max_replication_flush_lag", "10GB"); + // Replication-related configurations, such as WAL sending + match &self.replication { + Replication::Primary => { + // Configure backpressure + // - Replication write lag depends on how fast the walreceiver can process incoming WAL. + // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec, + // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB. + // Actually latency should be much smaller (better if < 1sec). But we assume that recently + // updates pages are not requested from pageserver. + // - Replication flush lag depends on speed of persisting data by checkpointer (creation of + // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to + // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long + // recovery time (in case of pageserver crash) and disk space overflow at safekeepers. + // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread. + // To be able to restore database in case of pageserver node crash, safekeeper should not + // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers + // (if they are not able to upload WAL to S3). + conf.append("max_replication_write_lag", "15MB"); + conf.append("max_replication_flush_lag", "10GB"); - if !self.env.safekeepers.is_empty() { - // Configure the node to connect to the safekeepers - conf.append("synchronous_standby_names", "walproposer"); + if !self.env.safekeepers.is_empty() { + // Configure the node to connect to the safekeepers + conf.append("synchronous_standby_names", "walproposer"); - let safekeepers = self - .env - .safekeepers - .iter() - .map(|sk| format!("localhost:{}", sk.pg_port)) - .collect::>() - .join(","); - conf.append("neon.safekeepers", &safekeepers); - } else { - // We only use setup without safekeepers for tests, - // and don't care about data durability on pageserver, - // so set more relaxed synchronous_commit. - conf.append("synchronous_commit", "remote_write"); + let safekeepers = self + .env + .safekeepers + .iter() + .map(|sk| format!("localhost:{}", sk.pg_port)) + .collect::>() + .join(","); + conf.append("neon.safekeepers", &safekeepers); + } else { + // We only use setup without safekeepers for tests, + // and don't care about data durability on pageserver, + // so set more relaxed synchronous_commit. + conf.append("synchronous_commit", "remote_write"); - // Configure the node to stream WAL directly to the pageserver - // This isn't really a supported configuration, but can be useful for - // testing. - conf.append("synchronous_standby_names", "pageserver"); + // Configure the node to stream WAL directly to the pageserver + // This isn't really a supported configuration, but can be useful for + // testing. + conf.append("synchronous_standby_names", "pageserver"); + } + } + Replication::Static(lsn) => { + conf.append("recovery_target_lsn", &lsn.to_string()); + } + Replication::Replica(of) => { + assert!(!self.env.safekeepers.is_empty()); + + // TODO: use future host field from safekeeper spec + // Q: What component should be responsible for providing the connection string? + // Q: How to handle the case when safekeeper failed / restarted? + let connstr = format!( + "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true", + "localhost", + self.env.safekeepers[0].pg_port, + &self.timeline_id.to_string(), + &self.tenant_id.to_string(), + ); + + let slot_name = format!("repl_{}_{}_{}", self.timeline_id, self.name, of); + conf.append("primary_conninfo", connstr.as_str()); + conf.append("primary_slot_name", slot_name.as_str()); + conf.append("hot_standby", "on"); + } } let mut file = File::create(self.pgdata().join("postgresql.conf"))?; @@ -383,21 +434,27 @@ impl PostgresNode { } fn load_basebackup(&self, auth_token: &Option) -> Result<()> { - let backup_lsn = if let Some(lsn) = self.lsn { - Some(lsn) - } else if self.uses_wal_proposer { - // LSN 0 means that it is bootstrap and we need to download just - // latest data from the pageserver. That is a bit clumsy but whole bootstrap - // procedure evolves quite actively right now, so let's think about it again - // when things would be more stable (TODO). - let lsn = self.sync_safekeepers(auth_token, self.pg_version)?; - if lsn == Lsn(0) { - None - } else { - Some(lsn) + let backup_lsn = match &self.replication { + Replication::Primary => { + if self.uses_wal_proposer { + // LSN 0 means that it is bootstrap and we need to download just + // latest data from the pageserver. That is a bit clumsy but whole bootstrap + // procedure evolves quite actively right now, so let's think about it again + // when things would be more stable (TODO). + let lsn = self.sync_safekeepers(auth_token, self.pg_version)?; + if lsn == Lsn(0) { + None + } else { + Some(lsn) + } + } else { + None + } + } + Replication::Static(lsn) => Some(*lsn), + Replication::Replica(_) => { + None // Take the latest snapshot available to start with } - } else { - None }; self.do_basebackup(backup_lsn)?; @@ -487,7 +544,7 @@ impl PostgresNode { // 3. Load basebackup self.load_basebackup(auth_token)?; - if self.lsn.is_some() { + if self.replication != Replication::Primary { File::create(self.pgdata().join("standby.signal"))?; } diff --git a/control_plane/src/postgresql_conf.rs b/control_plane/src/postgresql_conf.rs index 34dc769e78..638575eb82 100644 --- a/control_plane/src/postgresql_conf.rs +++ b/control_plane/src/postgresql_conf.rs @@ -13,7 +13,7 @@ use std::io::BufRead; use std::str::FromStr; /// In-memory representation of a postgresql.conf file -#[derive(Default)] +#[derive(Default, Debug)] pub struct PostgresConf { lines: Vec, hash: HashMap,